From 66a06e9be7e1cd7fb0c6ea83ccade3b3e03bcdad Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Mon, 27 Apr 2020 14:33:17 -0400 Subject: [PATCH 01/13] changes to support new purge task executions and st2 purge workflow --- st2common/setup.py | 2 ++ st2common/st2common/models/db/__init__.py | 14 ++++++++++++++ st2common/st2common/persistence/workflow.py | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/st2common/setup.py b/st2common/setup.py index 3e7bd4fb5c..fe975c3929 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -49,6 +49,8 @@ 'bin/st2-cleanup-db', 'bin/st2-register-content', 'bin/st2-purge-executions', + 'bin/st2-purge-workflows', + 'bin/st2-purge-task-executions', 'bin/st2-purge-trigger-instances', 'bin/st2-run-pack-tests', 'bin/st2ctl', diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 098a8e45da..06392cccdf 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -475,6 +475,7 @@ def delete_by_query(self, *args, **query): """ Delete objects by query and return number of deleted objects. """ + super(self) qs = self.model.objects.filter(*args, **query) count = qs.delete() log_query_and_profile_data_for_queryset(queryset=qs) @@ -596,6 +597,19 @@ def save(self, instance, validate=True): return self._undo_dict_field_escape(instance) + def delete(self, instance): + return instance.delete() + + def delete_by_query(self, *args, **query): + """ + Delete objects by query and return number of deleted objects. + """ + qs = self.model.objects.filter(*args, **query) + count = qs.delete() + log_query_and_profile_data_for_queryset(queryset=qs) + + return count + def get_host_names_for_uri_dict(uri_dict): hosts = [] diff --git a/st2common/st2common/persistence/workflow.py b/st2common/st2common/persistence/workflow.py index aade4b648d..aa3f365475 100644 --- a/st2common/st2common/persistence/workflow.py +++ b/st2common/st2common/persistence/workflow.py @@ -41,6 +41,10 @@ def _get_publisher(cls): return cls.publisher + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) + class TaskExecution(persistence.StatusBasedResource): impl = db.ChangeRevisionMongoDBAccess(wf_db_models.TaskExecutionDB) @@ -49,3 +53,7 @@ class TaskExecution(persistence.StatusBasedResource): @classmethod def _get_impl(cls): return cls.impl + + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) From 5f1dab8bc63e437668196e335d852057fe04f9bc Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Tue, 28 Apr 2020 09:29:21 -0400 Subject: [PATCH 02/13] Support new purge task executions and st2 purge workflow --- st2common/bin/st2-purge-task-executions | 22 +++ st2common/bin/st2-purge-workflows | 22 +++ .../st2common/cmd/purge_task_executions.py | 88 ++++++++++ st2common/st2common/cmd/purge_workflows.py | 88 ++++++++++ .../st2common/garbage_collection/workflows.py | 160 ++++++++++++++++++ .../tests/unit/test_purge_task_executions.py | 72 ++++++++ st2common/tests/unit/test_purge_worklows.py | 72 ++++++++ 7 files changed, 524 insertions(+) create mode 100644 st2common/bin/st2-purge-task-executions create mode 100644 st2common/bin/st2-purge-workflows create mode 100644 st2common/st2common/cmd/purge_task_executions.py create mode 100644 st2common/st2common/cmd/purge_workflows.py create mode 100644 st2common/st2common/garbage_collection/workflows.py create mode 100644 st2common/tests/unit/test_purge_task_executions.py create mode 100644 st2common/tests/unit/test_purge_worklows.py diff --git a/st2common/bin/st2-purge-task-executions b/st2common/bin/st2-purge-task-executions new file mode 100644 index 0000000000..eb052676d7 --- /dev/null +++ b/st2common/bin/st2-purge-task-executions @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_task_executions import main + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/st2common/bin/st2-purge-workflows b/st2common/bin/st2-purge-workflows new file mode 100644 index 0000000000..0480c22131 --- /dev/null +++ b/st2common/bin/st2-purge-workflows @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_workflows import main + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py new file mode 100644 index 0000000000..2d6155c25b --- /dev/null +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -0,0 +1,88 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_task_execution + +__all__ = [ + 'main' +] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt('timestamp', default=None, + help='Will delete execution and liveaction models older than ' + + 'this UTC timestamp. ' + + 'Example value: 2015-03-13T19:01:27.255542Z.'), + cfg.StrOpt('action-ref', default='', + help='action-ref to delete executions for.'), + cfg.BoolOpt('purge-incomplete', default=False, + help='Purge all models irrespective of their ``status``.' + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.'), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + action_ref = cfg.CONF.action_ref + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error('Please supply a timestamp for purging models. Aborting.') + return 1 + else: + timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_task_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, + purge_incomplete=purge_incomplete) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py new file mode 100644 index 0000000000..58479526c1 --- /dev/null +++ b/st2common/st2common/cmd/purge_workflows.py @@ -0,0 +1,88 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_workflow_execution + +__all__ = [ + 'main' +] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt('timestamp', default=None, + help='Will delete execution and liveaction models older than ' + + 'this UTC timestamp. ' + + 'Example value: 2015-03-13T19:01:27.255542Z.'), + cfg.StrOpt('action-ref', default='', + help='action-ref to delete executions for.'), + cfg.BoolOpt('purge-incomplete', default=False, + help='Purge all models irrespective of their ``status``.' + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.'), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + action_ref = cfg.CONF.action_ref + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error('Please supply a timestamp for purging models. Aborting.') + return 1 + else: + timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_workflow_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, + purge_incomplete=purge_incomplete) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py new file mode 100644 index 0000000000..8a2325d3e8 --- /dev/null +++ b/st2common/st2common/garbage_collection/workflows.py @@ -0,0 +1,160 @@ +# Copyright 2020 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module with utility functions for purging old action workflows. +""" +from __future__ import absolute_import + +import copy + +import six +from mongoengine.errors import InvalidQueryError + +from st2common.constants import action as action_constants +from st2common.persistence.workflow import WorkflowExecution +from st2common.persistence.workflow import TaskExecution + + +__all__ = [ + 'purge_workflow_execution', + 'purge_task_execution' +] + +#TODO: Are these valid too.. +DONE_STATES = [action_constants.LIVEACTION_STATUS_SUCCEEDED, + action_constants.LIVEACTION_STATUS_FAILED, + action_constants.LIVEACTION_STATUS_TIMED_OUT, + action_constants.LIVEACTION_STATUS_CANCELED] + + +def purge_workflow_execution(logger, timestamp, action_ref=None, purge_incomplete=False): + """ + Purge action executions and corresponding live action, execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param action_ref: Only delete executions for the provided actions. + :type action_ref: ``str`` + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError('Specify a valid timestamp to purge.') + + logger.info('Purging executions older than timestamp: %s' % + timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + + filters = {} + + if purge_incomplete: + filters['start_timestamp__lt'] = timestamp + else: + filters['end_timestamp__lt'] = timestamp + filters['start_timestamp__lt'] = timestamp + filters['status'] = {'$in': DONE_STATES} + + exec_filters = copy.copy(filters) + # TODO: Are these parameters valid. + # if action_ref: + # exec_filters['action__ref'] = action_ref + # + # liveaction_filters = copy.deepcopy(filters) + # if action_ref: + # liveaction_filters['action'] = action_ref + + # 1. Delete ActionExecutionDB objects + try: + # Note: We call list() on the query set object because it's lazyily evaluated otherwise + # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], + # no_dereference=True, + # **exec_filters)) + deleted_count = WorkflowExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ('Bad query (%s) used to delete execution instances: %s' + 'Please contact support.' % (exec_filters, six.text_type(e))) + raise InvalidQueryError(msg) + except: + logger.exception('Deletion of execution models failed for query with filters: %s.', + exec_filters) + else: + logger.info('Deleted %s action execution objects' % deleted_count) + + zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], + no_dereference=True, + **exec_filters)) + + if zombie_execution_instances > 0: + logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + + # Print stats + logger.info('All execution models older than timestamp %s were deleted.', timestamp) + + +def purge_task_execution(logger, timestamp, action_ref=None, purge_incomplete=False): + """ + Purge action executions and corresponding live action, execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param action_ref: Only delete executions for the provided actions. + :type action_ref: ``str`` + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError('Specify a valid timestamp to purge.') + + logger.info('Purging executions older than timestamp: %s' % + timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + + filters = {} + + if purge_incomplete: + filters['start_timestamp__lt'] = timestamp + else: + filters['end_timestamp__lt'] = timestamp + filters['start_timestamp__lt'] = timestamp + filters['status'] = {'$in': DONE_STATES} + + exec_filters = copy.copy(filters) + try: + # Note: We call list() on the query set object because it's lazyily evaluated otherwise + # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], + # no_dereference=True, + # **exec_filters)) + deleted_count = TaskExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ('Bad query (%s) used to delete execution instances: %s' + 'Please contact support.' % (exec_filters, six.text_type(e))) + raise InvalidQueryError(msg) + except: + logger.exception('Deletion of execution models failed for query with filters: %s.', + exec_filters) + else: + logger.info('Deleted %s action execution objects' % deleted_count) + + zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], + no_dereference=True, + **exec_filters)) + + if zombie_execution_instances > 0: + logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + + # Print stats + logger.info('All execution models older than timestamp %s were deleted.', timestamp) \ No newline at end of file diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py new file mode 100644 index 0000000000..e67934d660 --- /dev/null +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -0,0 +1,72 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED +from st2common.garbage_collection.workflows import purge_task_execution +from st2common.models.db.workflow import TaskExecutionDB +from st2common.persistence.workflow import TaskExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeTaskExecutionInstances(CleanDbTestCase): + + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeTaskExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeTaskExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='succeeded') + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 1) + expected_msg = 'Specify a valid timestamp' + self.assertRaisesRegexp(ValueError, expected_msg, + purge_task_execution, + logger=LOG, timestamp=None) + self.assertEqual(len(TaskExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='failed') + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=5), + status='canceled') + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 2) + purge_task_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(TaskExecution.get_all()), 1) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py new file mode 100644 index 0000000000..5f6620c7ff --- /dev/null +++ b/st2common/tests/unit/test_purge_worklows.py @@ -0,0 +1,72 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED +from st2common.garbage_collection.workflows import purge_workflow_execution +from st2common.models.db.workflow import WorkflowExecutionDB +from st2common.persistence.workflow import WorkflowExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeWorkflowExecutionInstances(CleanDbTestCase): + + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeWorkflowExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeWorkflowExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='running') + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 1) + expected_msg = 'Specify a valid timestamp' + self.assertRaisesRegexp(ValueError, expected_msg, + purge_workflow_execution, + logger=LOG, timestamp=None) + self.assertEqual(len(WorkflowExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='running') + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=5), + status='succeeded') + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 2) + purge_workflow_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(WorkflowExecution.get_all()), 1) From 36c7fdf50b6675df4c1fd030cc08661e196ef551 Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Wed, 24 Feb 2021 14:33:56 -0500 Subject: [PATCH 03/13] Call the purge workflow and task execution methods in the garbage collector. --- conf/st2.conf.sample | 5 ++ .../st2reactor/garbage_collector/base.py | 67 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 758b743e75..c9956b5016 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -165,6 +165,11 @@ purge_inquiries = False sleep_delay = 2 # Trigger instances older than this value (days) will be automatically deleted. trigger_instances_ttl = None +# Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. +workflow_execution_ttl = 7 +# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. +task_execution_ttl = 7 + [keyvalue] # Allow encryption of values in key value stored qualified as "secret". diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index 3261458677..0e2f529f4c 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -40,6 +40,7 @@ from st2common.garbage_collection.executions import purge_execution_output_objects from st2common.garbage_collection.executions import purge_orphaned_workflow_executions from st2common.garbage_collection.inquiries import purge_inquiries +from st2common.garbage_collection.workflows import purge_workflow_execution, purge_task_execution from st2common.garbage_collection.trigger_instances import purge_trigger_instances __all__ = [ @@ -68,6 +69,8 @@ def __init__(self, collection_interval=DEFAULT_COLLECTION_INTERVAL, self._trigger_instances_ttl = cfg.CONF.garbagecollector.trigger_instances_ttl self._purge_inquiries = cfg.CONF.garbagecollector.purge_inquiries self._workflow_execution_max_idle = cfg.CONF.workflow_engine.gc_max_idle_sec + self._workflow_execution_ttl = cfg.CONF.garbagecollector.workflow_execution_ttl + self._task_execution_ttl = cfg.CONF.garbagecollector.task_execution_ttl self._validate_ttl_values() @@ -181,6 +184,22 @@ def _perform_garbage_collection(self): else: LOG.debug(skip_message, obj_type) + obj_type = 'task executions' + if self._task_execution_ttl and self._task_execution_ttl >= MINIMUM_TTL_DAYS: + LOG.info(proc_message, obj_type) + self._purge_task_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + + obj_type = 'workflow executions' + if self._workflow_execution_ttl and self._workflow_execution_ttl >= MINIMUM_TTL_DAYS: + LOG.info(proc_message, obj_type) + self._purge_workflow_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + def _purge_action_executions(self): """ Purge action executions and corresponding live action, stdout and stderr object which match @@ -205,6 +224,54 @@ def _purge_action_executions(self): return True + def _purge_workflow_executions(self): + """ + Purge workflow executions and corresponding live action, stdout and stderr object which match + the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + + timestamp_str = isotime.format(dt=timestamp) + LOG.info('Deleting workflow executions older than: %s' % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_workflow_execution(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception('Failed to delete workflow executions: %s' % (six.text_type(e))) + + return True + + def _purge_task_executions(self): + """ + Purge task executions and corresponding live action, stdout and stderr object which match + the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + + timestamp_str = isotime.format(dt=timestamp) + LOG.info('Deleting task executions older than: %s' % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_task_execution(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception('Failed to delete task executions: %s' % (six.text_type(e))) + + return True + def _purge_action_executions_output(self): utc_now = get_datetime_utc_now() timestamp = (utc_now - datetime.timedelta(days=self._action_executions_output_ttl)) From e9d9cbbd2945c4348fa0182535db523eb838407b Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Wed, 24 Feb 2021 15:38:09 -0500 Subject: [PATCH 04/13] Fix unit tests ====================================================================== 1) ERROR: test_run (test_action_unload.UnloadActionTestCase) ---------------------------------------------------------------------- Traceback (most recent call last): tests/test_action_unload.py line 100 in test_run action.run(packs=[pack]) actions/pack_mgmt/unload.py line 68 in run self._unregister_rules(pack=pack) actions/pack_mgmt/unload.py line 99 in _unregister_rules cleanup_trigger_db_for_rule(rule_db=rule_db) /home/runner/work/st2/st2/st2common/st2common/services/triggers.py line 332 in cleanup_trigger_db_for_rule Trigger.delete_if_unreferenced(existing_trigger_db) /home/runner/work/st2/st2/st2common/st2common/persistence/trigger.py line 61 in delete_if_unreferenced cls._get_impl().delete_by_query(**delete_query) /home/runner/work/st2/st2/st2common/st2common/models/db/__init__.py line 479 in delete_by_query super(self) TypeError: super() argument 1 must be type, not MongoDBAccess --- st2common/st2common/models/db/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 93704ef542..d8572eb271 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -476,7 +476,6 @@ def delete_by_query(self, *args, **query): """ Delete objects by query and return number of deleted objects. """ - super(self) qs = self.model.objects.filter(*args, **query) count = qs.delete() log_query_and_profile_data_for_queryset(queryset=qs) From 6f036eb0fd2aec8fc442139a5c7d7b1c68bfb081 Mon Sep 17 00:00:00 2001 From: amanda Date: Fri, 1 Apr 2022 11:02:26 +0100 Subject: [PATCH 05/13] Address review comments --- CHANGELOG.rst | 3 + conf/st2.conf.sample | 8 +- .../st2common/cmd/purge_task_executions.py | 40 +++--- st2common/st2common/cmd/purge_workflows.py | 40 +++--- .../st2common/garbage_collection/workflows.py | 130 +++++++++--------- .../tests/unit/test_purge_task_executions.py | 42 +++--- st2common/tests/unit/test_purge_worklows.py | 46 ++++--- .../st2reactor/garbage_collector/base.py | 44 ++++-- .../st2reactor/garbage_collector/config.py | 10 ++ 9 files changed, 200 insertions(+), 163 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c1091ef86a..c4c9856e00 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -155,6 +155,9 @@ Added * Added garbage collection for rule_enforcement and trace models #5596/5602 Contributed by Amanda McGuinness (@amanda11 intive) +* Added garbage collection for workflow execution and task execution objects #4924 + Contributed by @srimandaleeka01 and @amanda11 + Fixed ~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 950d15a762..ce3852cfa5 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -185,10 +185,10 @@ sleep_delay = 2 traces_ttl = None # Trigger instances older than this value (days) will be automatically deleted. Defaults to None (disabled). trigger_instances_ttl = None -# Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. -workflow_execution_ttl = 7 -# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. -task_execution_ttl = 7 +# Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +workflow_executions_ttl = 7 +# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +task_executions_ttl = 7 [keyvalue] diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py index 2d6155c25b..298fad1ffd 100644 --- a/st2common/st2common/cmd/purge_task_executions.py +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,25 +37,27 @@ from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2common.garbage_collection.workflows import purge_task_execution -__all__ = [ - 'main' -] +__all__ = ["main"] LOG = logging.getLogger(__name__) def _register_cli_opts(): cli_opts = [ - cfg.StrOpt('timestamp', default=None, - help='Will delete execution and liveaction models older than ' + - 'this UTC timestamp. ' + - 'Example value: 2015-03-13T19:01:27.255542Z.'), - cfg.StrOpt('action-ref', default='', - help='action-ref to delete executions for.'), - cfg.BoolOpt('purge-incomplete', default=False, - help='Purge all models irrespective of their ``status``.' + - 'By default, only executions in completed states such as "succeeeded" ' + - ', "failed", "canceled" and "timed_out" are deleted.'), + cfg.StrOpt( + "timestamp", + default=None, + help="Will delete workflow execution objects older than " + + "this UTC timestamp. " + + "Example value: 2015-03-13T19:01:27.255542Z.", + ), + cfg.BoolOpt( + "purge-incomplete", + default=False, + help="Purge all models irrespective of their ``status``." + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.', + ), ] do_register_cli_opts(cli_opts) @@ -66,19 +68,19 @@ def main(): # Get config values timestamp = cfg.CONF.timestamp - action_ref = cfg.CONF.action_ref purge_incomplete = cfg.CONF.purge_incomplete if not timestamp: - LOG.error('Please supply a timestamp for purging models. Aborting.') + LOG.error("Please supply a timestamp for purging models. Aborting.") return 1 else: - timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") timestamp = timestamp.replace(tzinfo=pytz.UTC) try: - purge_task_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, - purge_incomplete=purge_incomplete) + purge_task_execution( + logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete + ) except Exception as e: LOG.exception(six.text_type(e)) return FAILURE_EXIT_CODE diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py index 58479526c1..ee091655e2 100644 --- a/st2common/st2common/cmd/purge_workflows.py +++ b/st2common/st2common/cmd/purge_workflows.py @@ -1,4 +1,4 @@ -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,25 +37,27 @@ from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2common.garbage_collection.workflows import purge_workflow_execution -__all__ = [ - 'main' -] +__all__ = ["main"] LOG = logging.getLogger(__name__) def _register_cli_opts(): cli_opts = [ - cfg.StrOpt('timestamp', default=None, - help='Will delete execution and liveaction models older than ' + - 'this UTC timestamp. ' + - 'Example value: 2015-03-13T19:01:27.255542Z.'), - cfg.StrOpt('action-ref', default='', - help='action-ref to delete executions for.'), - cfg.BoolOpt('purge-incomplete', default=False, - help='Purge all models irrespective of their ``status``.' + - 'By default, only executions in completed states such as "succeeeded" ' + - ', "failed", "canceled" and "timed_out" are deleted.'), + cfg.StrOpt( + "timestamp", + default=None, + help="Will delete workflow execution older than " + + "this UTC timestamp. " + + "Example value: 2015-03-13T19:01:27.255542Z.", + ), + cfg.BoolOpt( + "purge-incomplete", + default=False, + help="Purge all models irrespective of their ``status``." + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.', + ), ] do_register_cli_opts(cli_opts) @@ -66,19 +68,19 @@ def main(): # Get config values timestamp = cfg.CONF.timestamp - action_ref = cfg.CONF.action_ref purge_incomplete = cfg.CONF.purge_incomplete if not timestamp: - LOG.error('Please supply a timestamp for purging models. Aborting.') + LOG.error("Please supply a timestamp for purging models. Aborting.") return 1 else: - timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") timestamp = timestamp.replace(tzinfo=pytz.UTC) try: - purge_workflow_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, - purge_incomplete=purge_incomplete) + purge_workflow_execution( + logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete + ) except Exception as e: LOG.exception(six.text_type(e)) return FAILURE_EXIT_CODE diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py index 8a2325d3e8..09eaf0af45 100644 --- a/st2common/st2common/garbage_collection/workflows.py +++ b/st2common/st2common/garbage_collection/workflows.py @@ -1,4 +1,4 @@ -# Copyright 2020 Extreme Networks, Inc. +# Copyright 2021, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ # limitations under the License. """ -Module with utility functions for purging old action workflows. +Module with utility functions for purging old workflow executions. """ from __future__ import absolute_import @@ -27,56 +27,47 @@ from st2common.persistence.workflow import TaskExecution -__all__ = [ - 'purge_workflow_execution', - 'purge_task_execution' -] +__all__ = ["purge_workflow_execution", "purge_task_execution"] -#TODO: Are these valid too.. -DONE_STATES = [action_constants.LIVEACTION_STATUS_SUCCEEDED, - action_constants.LIVEACTION_STATUS_FAILED, - action_constants.LIVEACTION_STATUS_TIMED_OUT, - action_constants.LIVEACTION_STATUS_CANCELED] +# TODO: Are these valid too.. +DONE_STATES = [ + action_constants.LIVEACTION_STATUS_SUCCEEDED, + action_constants.LIVEACTION_STATUS_FAILED, + action_constants.LIVEACTION_STATUS_TIMED_OUT, + action_constants.LIVEACTION_STATUS_CANCELED, +] -def purge_workflow_execution(logger, timestamp, action_ref=None, purge_incomplete=False): +def purge_workflow_execution(logger, timestamp, purge_incomplete=False): """ - Purge action executions and corresponding live action, execution output objects. + Purge workflow execution output objects. :param timestamp: Exections older than this timestamp will be deleted. :type timestamp: ``datetime.datetime - :param action_ref: Only delete executions for the provided actions. - :type action_ref: ``str`` - :param purge_incomplete: True to also delete executions which are not in a done state. :type purge_incomplete: ``bool`` """ if not timestamp: - raise ValueError('Specify a valid timestamp to purge.') + raise ValueError("Specify a valid timestamp to purge.") - logger.info('Purging executions older than timestamp: %s' % - timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + logger.info( + "Purging executions older than timestamp: %s" + % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + ) filters = {} if purge_incomplete: - filters['start_timestamp__lt'] = timestamp + filters["start_timestamp__lt"] = timestamp else: - filters['end_timestamp__lt'] = timestamp - filters['start_timestamp__lt'] = timestamp - filters['status'] = {'$in': DONE_STATES} + filters["end_timestamp__lt"] = timestamp + filters["start_timestamp__lt"] = timestamp + filters["status"] = {"$in": DONE_STATES} exec_filters = copy.copy(filters) - # TODO: Are these parameters valid. - # if action_ref: - # exec_filters['action__ref'] = action_ref - # - # liveaction_filters = copy.deepcopy(filters) - # if action_ref: - # liveaction_filters['action'] = action_ref - - # 1. Delete ActionExecutionDB objects + + # 1. Delete Workflow Execution objects try: # Note: We call list() on the query set object because it's lazyily evaluated otherwise # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], @@ -84,77 +75,80 @@ def purge_workflow_execution(logger, timestamp, action_ref=None, purge_incomplet # **exec_filters)) deleted_count = WorkflowExecution.delete_by_query(**exec_filters) except InvalidQueryError as e: - msg = ('Bad query (%s) used to delete execution instances: %s' - 'Please contact support.' % (exec_filters, six.text_type(e))) + msg = ( + "Bad query (%s) used to delete execution instances: %s" + "Please contact support." % (exec_filters, six.text_type(e)) + ) raise InvalidQueryError(msg) except: - logger.exception('Deletion of execution models failed for query with filters: %s.', - exec_filters) + logger.exception( + "Deletion of execution models failed for query with filters: %s.", + exec_filters, + ) else: - logger.info('Deleted %s action execution objects' % deleted_count) + logger.info("Deleted %s workflow execution objects" % deleted_count) - zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], - no_dereference=True, - **exec_filters)) + zombie_execution_instances = len( + WorkflowExecution.query(only_fields=["id"], no_dereference=True, **exec_filters) + ) if zombie_execution_instances > 0: - logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + logger.error("Zombie execution instances left: %d.", zombie_execution_instances) # Print stats - logger.info('All execution models older than timestamp %s were deleted.', timestamp) + logger.info("All execution models older than timestamp %s were deleted.", timestamp) -def purge_task_execution(logger, timestamp, action_ref=None, purge_incomplete=False): +def purge_task_execution(logger, timestamp, purge_incomplete=False): """ - Purge action executions and corresponding live action, execution output objects. + Purge task execution output objects. :param timestamp: Exections older than this timestamp will be deleted. :type timestamp: ``datetime.datetime - :param action_ref: Only delete executions for the provided actions. - :type action_ref: ``str`` - :param purge_incomplete: True to also delete executions which are not in a done state. :type purge_incomplete: ``bool`` """ if not timestamp: - raise ValueError('Specify a valid timestamp to purge.') + raise ValueError("Specify a valid timestamp to purge.") - logger.info('Purging executions older than timestamp: %s' % - timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + logger.info( + "Purging executions older than timestamp: %s" + % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + ) filters = {} if purge_incomplete: - filters['start_timestamp__lt'] = timestamp + filters["start_timestamp__lt"] = timestamp else: - filters['end_timestamp__lt'] = timestamp - filters['start_timestamp__lt'] = timestamp - filters['status'] = {'$in': DONE_STATES} + filters["end_timestamp__lt"] = timestamp + filters["start_timestamp__lt"] = timestamp + filters["status"] = {"$in": DONE_STATES} exec_filters = copy.copy(filters) try: - # Note: We call list() on the query set object because it's lazyily evaluated otherwise - # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], - # no_dereference=True, - # **exec_filters)) deleted_count = TaskExecution.delete_by_query(**exec_filters) except InvalidQueryError as e: - msg = ('Bad query (%s) used to delete execution instances: %s' - 'Please contact support.' % (exec_filters, six.text_type(e))) + msg = ( + "Bad query (%s) used to delete execution instances: %s" + "Please contact support." % (exec_filters, six.text_type(e)) + ) raise InvalidQueryError(msg) except: - logger.exception('Deletion of execution models failed for query with filters: %s.', - exec_filters) + logger.exception( + "Deletion of execution models failed for query with filters: %s.", + exec_filters, + ) else: - logger.info('Deleted %s action execution objects' % deleted_count) + logger.info("Deleted %s task execution objects" % deleted_count) - zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], - no_dereference=True, - **exec_filters)) + zombie_execution_instances = len( + TaskExecution.query(only_fields=["id"], no_dereference=True, **exec_filters) + ) if zombie_execution_instances > 0: - logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + logger.error("Zombie execution instances left: %d.", zombie_execution_instances) # Print stats - logger.info('All execution models older than timestamp %s were deleted.', timestamp) \ No newline at end of file + logger.info("All execution models older than timestamp %s were deleted.", timestamp) diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py index e67934d660..083d135e9f 100644 --- a/st2common/tests/unit/test_purge_task_executions.py +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ from datetime import timedelta from st2common import log as logging -from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED from st2common.garbage_collection.workflows import purge_task_execution from st2common.models.db.workflow import TaskExecutionDB from st2common.persistence.workflow import TaskExecution @@ -27,7 +26,6 @@ class TestPurgeTaskExecutionInstances(CleanDbTestCase): - @classmethod def setUpClass(cls): CleanDbTestCase.setUpClass() @@ -39,32 +37,38 @@ def setUp(self): def test_no_timestamp_doesnt_delete(self): now = date_utils.get_datetime_utc_now() - instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=20), - status='succeeded') + instance_db = TaskExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=20), + status="succeeded", + ) TaskExecution.add_or_update(instance_db) self.assertEqual(len(TaskExecution.get_all()), 1) - expected_msg = 'Specify a valid timestamp' - self.assertRaisesRegexp(ValueError, expected_msg, - purge_task_execution, - logger=LOG, timestamp=None) + expected_msg = "Specify a valid timestamp" + self.assertRaisesRegexp( + ValueError, expected_msg, purge_task_execution, logger=LOG, timestamp=None + ) self.assertEqual(len(TaskExecution.get_all()), 1) def test_purge(self): now = date_utils.get_datetime_utc_now() - instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=20), - status='failed') + instance_db = TaskExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=20), + status="failed", + ) TaskExecution.add_or_update(instance_db) - instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=5), - status='canceled') + instance_db = TaskExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=5), + status="canceled", + ) TaskExecution.add_or_update(instance_db) self.assertEqual(len(TaskExecution.get_all()), 2) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py index 5f6620c7ff..5d4ca219d8 100644 --- a/st2common/tests/unit/test_purge_worklows.py +++ b/st2common/tests/unit/test_purge_worklows.py @@ -1,4 +1,4 @@ -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ from datetime import timedelta from st2common import log as logging -from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED from st2common.garbage_collection.workflows import purge_workflow_execution from st2common.models.db.workflow import WorkflowExecutionDB from st2common.persistence.workflow import WorkflowExecution @@ -27,7 +26,6 @@ class TestPurgeWorkflowExecutionInstances(CleanDbTestCase): - @classmethod def setUpClass(cls): CleanDbTestCase.setUpClass() @@ -39,32 +37,42 @@ def setUp(self): def test_no_timestamp_doesnt_delete(self): now = date_utils.get_datetime_utc_now() - instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=20), - status='running') + instance_db = WorkflowExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=20), + status="running", + ) WorkflowExecution.add_or_update(instance_db) self.assertEqual(len(WorkflowExecution.get_all()), 1) - expected_msg = 'Specify a valid timestamp' - self.assertRaisesRegexp(ValueError, expected_msg, - purge_workflow_execution, - logger=LOG, timestamp=None) + expected_msg = "Specify a valid timestamp" + self.assertRaisesRegexp( + ValueError, + expected_msg, + purge_workflow_execution, + logger=LOG, + timestamp=None, + ) self.assertEqual(len(WorkflowExecution.get_all()), 1) def test_purge(self): now = date_utils.get_datetime_utc_now() - instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=20), - status='running') + instance_db = WorkflowExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=20), + status="running", + ) WorkflowExecution.add_or_update(instance_db) - instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', - payload={'hola': 'hi', 'kuraci': 'chicken'}, - occurrence_time=now - timedelta(days=5), - status='succeeded') + instance_db = WorkflowExecutionDB( + trigger="purge_tool.dummy.trigger", + payload={"hola": "hi", "kuraci": "chicken"}, + occurrence_time=now - timedelta(days=5), + status="succeeded", + ) WorkflowExecution.add_or_update(instance_db) self.assertEqual(len(WorkflowExecution.get_all()), 2) diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index d7f76ea594..baba86734b 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -40,7 +40,10 @@ from st2common.garbage_collection.executions import purge_execution_output_objects from st2common.garbage_collection.executions import purge_orphaned_workflow_executions from st2common.garbage_collection.inquiries import purge_inquiries -from st2common.garbage_collection.workflows import purge_workflow_execution, purge_task_execution +from st2common.garbage_collection.workflows import ( + purge_workflow_execution, + purge_task_execution, +) from st2common.garbage_collection.trigger_instances import purge_trigger_instances from st2common.garbage_collection.trace import purge_traces from st2common.garbage_collection.rule_enforcement import purge_rule_enforcements @@ -76,8 +79,10 @@ def __init__( self._rule_enforcements_ttl = cfg.CONF.garbagecollector.rule_enforcements_ttl self._purge_inquiries = cfg.CONF.garbagecollector.purge_inquiries self._workflow_execution_max_idle = cfg.CONF.workflow_engine.gc_max_idle_sec - self._workflow_execution_ttl = cfg.CONF.garbagecollector.workflow_execution_ttl - self._task_execution_ttl = cfg.CONF.garbagecollector.task_execution_ttl + self._workflow_executions_ttl = ( + cfg.CONF.garbagecollector.workflow_executions_ttl + ) + self._task_executions_ttl = cfg.CONF.garbagecollector.task_executions_ttl self._validate_ttl_values() @@ -255,16 +260,19 @@ def _perform_garbage_collection(self): else: LOG.debug(skip_message, obj_type) - obj_type = 'task executions' - if self._task_execution_ttl and self._task_execution_ttl >= MINIMUM_TTL_DAYS: + obj_type = "task executions" + if self._task_executions_ttl and self._task_executions_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_task_executions() concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) - obj_type = 'workflow executions' - if self._workflow_execution_ttl and self._workflow_execution_ttl >= MINIMUM_TTL_DAYS: + obj_type = "workflow executions" + if ( + self._workflow_executions_ttl + and self._workflow_executions_ttl >= MINIMUM_TTL_DAYS + ): LOG.info(proc_message, obj_type) self._purge_workflow_executions() concurrency.sleep(self._sleep_delay) @@ -307,21 +315,25 @@ def _purge_workflow_executions(self): the criteria defined in the config. """ utc_now = get_datetime_utc_now() - timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + timestamp = utc_now - datetime.timedelta(days=self._workflow_executions_ttl) # Another sanity check to make sure we don't delete new executions if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): - raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + raise ValueError( + "Calculated timestamp would violate the minimum TTL constraint" + ) timestamp_str = isotime.format(dt=timestamp) - LOG.info('Deleting workflow executions older than: %s' % (timestamp_str)) + LOG.info("Deleting workflow executions older than: %s" % (timestamp_str)) assert timestamp < utc_now try: purge_workflow_execution(logger=LOG, timestamp=timestamp) except Exception as e: - LOG.exception('Failed to delete workflow executions: %s' % (six.text_type(e))) + LOG.exception( + "Failed to delete workflow executions: %s" % (six.text_type(e)) + ) return True @@ -331,21 +343,23 @@ def _purge_task_executions(self): the criteria defined in the config. """ utc_now = get_datetime_utc_now() - timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + timestamp = utc_now - datetime.timedelta(days=self._task_executions_ttl) # Another sanity check to make sure we don't delete new executions if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): - raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + raise ValueError( + "Calculated timestamp would violate the minimum TTL constraint" + ) timestamp_str = isotime.format(dt=timestamp) - LOG.info('Deleting task executions older than: %s' % (timestamp_str)) + LOG.info("Deleting task executions older than: %s" % (timestamp_str)) assert timestamp < utc_now try: purge_task_execution(logger=LOG, timestamp=timestamp) except Exception as e: - LOG.exception('Failed to delete task executions: %s' % (six.text_type(e))) + LOG.exception("Failed to delete task executions: %s" % (six.text_type(e))) return True diff --git a/st2reactor/st2reactor/garbage_collector/config.py b/st2reactor/st2reactor/garbage_collector/config.py index c70074939d..54327b9de2 100644 --- a/st2reactor/st2reactor/garbage_collector/config.py +++ b/st2reactor/st2reactor/garbage_collector/config.py @@ -106,6 +106,16 @@ def _register_garbage_collector_opts(ignore_errors=False): default=None, help="Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled).", ), + cfg.IntOpt( + "workflow_executions_ttl", + default=None, + help="Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + ), + cfg.IntOpt( + "task_executions_ttl", + default=None, + help="Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + ), ] common_config.do_register_opts( From d84c290214c3de9a471219ebf49def3dbe9244d2 Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 10:11:35 +0000 Subject: [PATCH 06/13] Change copyright as 2020 was when PR first submitted --- st2common/st2common/cmd/purge_task_executions.py | 2 +- st2common/st2common/cmd/purge_workflows.py | 2 +- st2common/st2common/garbage_collection/workflows.py | 2 +- st2common/tests/unit/test_purge_task_executions.py | 2 +- st2common/tests/unit/test_purge_worklows.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py index 298fad1ffd..5b60b3674f 100644 --- a/st2common/st2common/cmd/purge_task_executions.py +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2021, The StackStorm Authors. +# Copyright 2020, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py index ee091655e2..0fdbd81772 100644 --- a/st2common/st2common/cmd/purge_workflows.py +++ b/st2common/st2common/cmd/purge_workflows.py @@ -1,4 +1,4 @@ -# Copyright 2021, The StackStorm Authors. +# Copyright 2020, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py index 09eaf0af45..3b269f1a45 100644 --- a/st2common/st2common/garbage_collection/workflows.py +++ b/st2common/st2common/garbage_collection/workflows.py @@ -1,4 +1,4 @@ -# Copyright 2021, The StackStorm Authors. +# Copyright 2020, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py index 083d135e9f..dd25fbc5d8 100644 --- a/st2common/tests/unit/test_purge_task_executions.py +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2021, The StackStorm Authors. +# Copyright 2020, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py index 5d4ca219d8..ef0017c3e7 100644 --- a/st2common/tests/unit/test_purge_worklows.py +++ b/st2common/tests/unit/test_purge_worklows.py @@ -1,4 +1,4 @@ -# Copyright 2021, The StackStorm Authors. +# Copyright 2020, The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 41d6afbd50c9d3cbaad1f0a24b8777b29747116d Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 10:32:04 +0000 Subject: [PATCH 07/13] Regenerate st2.conf.sample --- conf/st2.conf.sample | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index ce3852cfa5..9a0ff55179 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -181,15 +181,14 @@ purge_inquiries = False rule_enforcements_ttl = None # How long to wait / sleep (in seconds) between collection of different object types. sleep_delay = 2 +# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +task_executions_ttl = None # Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled). traces_ttl = None # Trigger instances older than this value (days) will be automatically deleted. Defaults to None (disabled). trigger_instances_ttl = None # Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). -workflow_executions_ttl = 7 -# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). -task_executions_ttl = 7 - +workflow_executions_ttl = None [keyvalue] # Allow encryption of values in key value stored qualified as "secret". From 8ef4027ee3f734419ce39f7724ea31270318abee Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 11:42:51 +0000 Subject: [PATCH 08/13] Black format --- st2common/setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/st2common/setup.py b/st2common/setup.py index 42fbc90296..aa10d41dec 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -52,8 +52,8 @@ "bin/st2-cleanup-db", "bin/st2-register-content", "bin/st2-purge-executions", - 'bin/st2-purge-workflows', - 'bin/st2-purge-task-executions', + "bin/st2-purge-workflows", + "bin/st2-purge-task-executions", "bin/st2-purge-trigger-instances", "bin/st2-purge-traces", "bin/st2-purge-rule-enforcements", From 078387e2dc1ce4477f8a5c89cf090d0188aa15c7 Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 11:54:02 +0000 Subject: [PATCH 09/13] Fix copyright statement, remove comma --- st2common/st2common/cmd/purge_task_executions.py | 2 +- st2common/st2common/cmd/purge_workflows.py | 2 +- st2common/st2common/garbage_collection/workflows.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py index 5b60b3674f..5887b9ad99 100644 --- a/st2common/st2common/cmd/purge_task_executions.py +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2020, The StackStorm Authors. +# Copyright 2020 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py index 0fdbd81772..43e027d396 100644 --- a/st2common/st2common/cmd/purge_workflows.py +++ b/st2common/st2common/cmd/purge_workflows.py @@ -1,4 +1,4 @@ -# Copyright 2020, The StackStorm Authors. +# Copyright 2020 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py index 3b269f1a45..071f13a52f 100644 --- a/st2common/st2common/garbage_collection/workflows.py +++ b/st2common/st2common/garbage_collection/workflows.py @@ -1,4 +1,4 @@ -# Copyright 2020, The StackStorm Authors. +# Copyright 2020 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 95e258c2bbf92faacf8f8b54c9d75c4d675ef1bf Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 12:13:41 +0000 Subject: [PATCH 10/13] Fix UTs --- .../tests/unit/test_purge_task_executions.py | 56 +++++++++++++++---- st2common/tests/unit/test_purge_worklows.py | 54 ++++++++++++++---- 2 files changed, 88 insertions(+), 22 deletions(-) diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py index dd25fbc5d8..e4c72495ee 100644 --- a/st2common/tests/unit/test_purge_task_executions.py +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2020, The StackStorm Authors. +# Copyright 2020 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -38,9 +38,8 @@ def test_no_timestamp_doesnt_delete(self): now = date_utils.get_datetime_utc_now() instance_db = TaskExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=20), + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), status="succeeded", ) TaskExecution.add_or_update(instance_db) @@ -56,21 +55,56 @@ def test_purge(self): now = date_utils.get_datetime_utc_now() instance_db = TaskExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=20), + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), status="failed", ) TaskExecution.add_or_update(instance_db) + # Addn incomplete instance_db = TaskExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=5), + start_timestamp=now - timedelta(days=20), + status="running", + ) + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), status="canceled", ) TaskExecution.add_or_update(instance_db) - self.assertEqual(len(TaskExecution.get_all()), 2) + self.assertEqual(len(TaskExecution.get_all()), 3) purge_task_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(TaskExecution.get_all()), 2) + + def test_purge_incomplete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="failed", + ) + TaskExecution.add_or_update(instance_db) + + # Addn incomplete + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="canceled", + ) + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 3) + purge_task_execution( + logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True + ) self.assertEqual(len(TaskExecution.get_all()), 1) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py index ef0017c3e7..c79147b9cb 100644 --- a/st2common/tests/unit/test_purge_worklows.py +++ b/st2common/tests/unit/test_purge_worklows.py @@ -1,4 +1,4 @@ -# Copyright 2020, The StackStorm Authors. +# Copyright 2020 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -38,9 +38,8 @@ def test_no_timestamp_doesnt_delete(self): now = date_utils.get_datetime_utc_now() instance_db = WorkflowExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=20), + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), status="running", ) WorkflowExecution.add_or_update(instance_db) @@ -60,21 +59,54 @@ def test_purge(self): now = date_utils.get_datetime_utc_now() instance_db = WorkflowExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=20), + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="failed", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), status="running", ) WorkflowExecution.add_or_update(instance_db) instance_db = WorkflowExecutionDB( - trigger="purge_tool.dummy.trigger", - payload={"hola": "hi", "kuraci": "chicken"}, - occurrence_time=now - timedelta(days=5), + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), status="succeeded", ) WorkflowExecution.add_or_update(instance_db) - self.assertEqual(len(WorkflowExecution.get_all()), 2) + self.assertEqual(len(WorkflowExecution.get_all()), 3) purge_workflow_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(WorkflowExecution.get_all()), 2) + + def test_purge_incomplete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="cancelled", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="succeeded", + ) + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 3) + purge_workflow_execution( + logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True + ) self.assertEqual(len(WorkflowExecution.get_all()), 1) From c85250947a6ccdc3736539a24c6cb1be3425f6f2 Mon Sep 17 00:00:00 2001 From: Amanda McGuinness Date: Fri, 1 Apr 2022 12:19:52 +0000 Subject: [PATCH 11/13] Missed tests config.py --- st2tests/st2tests/config.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 1d34159d8c..3732f8c83d 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -494,6 +494,16 @@ def _register_garbage_collector_opts(): default=None, help="Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled).", ), + cfg.IntOpt( + "workflow_executions_ttl", + default=None, + help="Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + ), + cfg.IntOpt( + "task_executions_ttl", + default=None, + help="Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + ), ] _register_opts(ttl_opts, group="garbagecollector") From 78b34532a6cae4da027e44f9984a4154107f2339 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 1 Apr 2022 09:30:58 -0500 Subject: [PATCH 12/13] More consistent comments and method naming + a few misc spacing cleanups --- conf/st2.conf.sample | 4 +-- st2common/bin/st2-purge-task-executions | 2 +- st2common/bin/st2-purge-workflows | 2 +- .../st2common/cmd/purge_task_executions.py | 14 ++++---- st2common/st2common/cmd/purge_workflows.py | 14 ++++---- .../st2common/garbage_collection/workflows.py | 35 ++++++++++++------- .../tests/unit/test_purge_task_executions.py | 10 +++--- st2common/tests/unit/test_purge_worklows.py | 10 +++--- .../st2reactor/garbage_collector/base.py | 24 +++++++------ .../st2reactor/garbage_collector/config.py | 8 +++-- st2tests/st2tests/config.py | 8 +++-- 11 files changed, 75 insertions(+), 56 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 9a0ff55179..bb6bbd8d42 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -181,13 +181,13 @@ purge_inquiries = False rule_enforcements_ttl = None # How long to wait / sleep (in seconds) between collection of different object types. sleep_delay = 2 -# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +# Workflow task execution output objects (generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). task_executions_ttl = None # Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled). traces_ttl = None # Trigger instances older than this value (days) will be automatically deleted. Defaults to None (disabled). trigger_instances_ttl = None -# Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +# Workflow execution output objects (generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). workflow_executions_ttl = None [keyvalue] diff --git a/st2common/bin/st2-purge-task-executions b/st2common/bin/st2-purge-task-executions index eb052676d7..bff72f36b6 100644 --- a/st2common/bin/st2-purge-task-executions +++ b/st2common/bin/st2-purge-task-executions @@ -19,4 +19,4 @@ import sys from st2common.cmd.purge_task_executions import main if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/st2common/bin/st2-purge-workflows b/st2common/bin/st2-purge-workflows index 0480c22131..af90749993 100644 --- a/st2common/bin/st2-purge-workflows +++ b/st2common/bin/st2-purge-workflows @@ -19,4 +19,4 @@ import sys from st2common.cmd.purge_workflows import main if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py index 5887b9ad99..61f196484c 100644 --- a/st2common/st2common/cmd/purge_task_executions.py +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2020 The StackStorm Authors. +# Copyright 2022 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ """ -A utility script that purges st2 executions older than certain +A utility script that purges st2 workflow task executions older than certain timestamp. *** RISK RISK RISK. You will lose data. Run at your own risk. *** @@ -35,7 +35,7 @@ from st2common.script_setup import teardown as common_teardown from st2common.constants.exit_codes import SUCCESS_EXIT_CODE from st2common.constants.exit_codes import FAILURE_EXIT_CODE -from st2common.garbage_collection.workflows import purge_task_execution +from st2common.garbage_collection.workflows import purge_task_executions __all__ = ["main"] @@ -47,7 +47,7 @@ def _register_cli_opts(): cfg.StrOpt( "timestamp", default=None, - help="Will delete workflow execution objects older than " + help="Will delete workflow task execution objects older than " + "this UTC timestamp. " + "Example value: 2015-03-13T19:01:27.255542Z.", ), @@ -55,8 +55,8 @@ def _register_cli_opts(): "purge-incomplete", default=False, help="Purge all models irrespective of their ``status``." - + 'By default, only executions in completed states such as "succeeeded" ' - + ', "failed", "canceled" and "timed_out" are deleted.', + + 'By default, only workflow task executions in completed states such as ' + + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', ), ] do_register_cli_opts(cli_opts) @@ -78,7 +78,7 @@ def main(): timestamp = timestamp.replace(tzinfo=pytz.UTC) try: - purge_task_execution( + purge_task_executions( logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete ) except Exception as e: diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py index 43e027d396..b7995db6db 100644 --- a/st2common/st2common/cmd/purge_workflows.py +++ b/st2common/st2common/cmd/purge_workflows.py @@ -1,4 +1,4 @@ -# Copyright 2020 The StackStorm Authors. +# Copyright 2022 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ """ -A utility script that purges st2 executions older than certain +A utility script that purges st2 workflow executions older than certain timestamp. *** RISK RISK RISK. You will lose data. Run at your own risk. *** @@ -35,7 +35,7 @@ from st2common.script_setup import teardown as common_teardown from st2common.constants.exit_codes import SUCCESS_EXIT_CODE from st2common.constants.exit_codes import FAILURE_EXIT_CODE -from st2common.garbage_collection.workflows import purge_workflow_execution +from st2common.garbage_collection.workflows import purge_workflow_executions __all__ = ["main"] @@ -47,7 +47,7 @@ def _register_cli_opts(): cfg.StrOpt( "timestamp", default=None, - help="Will delete workflow execution older than " + help="Will delete workflow execution objects older than " + "this UTC timestamp. " + "Example value: 2015-03-13T19:01:27.255542Z.", ), @@ -55,8 +55,8 @@ def _register_cli_opts(): "purge-incomplete", default=False, help="Purge all models irrespective of their ``status``." - + 'By default, only executions in completed states such as "succeeeded" ' - + ', "failed", "canceled" and "timed_out" are deleted.', + + 'By default, only workflow executions in completed states such as ' + + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', ), ] do_register_cli_opts(cli_opts) @@ -78,7 +78,7 @@ def main(): timestamp = timestamp.replace(tzinfo=pytz.UTC) try: - purge_workflow_execution( + purge_workflow_executions( logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete ) except Exception as e: diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py index 071f13a52f..d815124353 100644 --- a/st2common/st2common/garbage_collection/workflows.py +++ b/st2common/st2common/garbage_collection/workflows.py @@ -1,4 +1,4 @@ -# Copyright 2020 The StackStorm Authors. +# Copyright 2022 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ from st2common.persistence.workflow import TaskExecution -__all__ = ["purge_workflow_execution", "purge_task_execution"] +__all__ = ["purge_workflow_executions", "purge_task_executions"] # TODO: Are these valid too.. DONE_STATES = [ @@ -38,7 +38,7 @@ ] -def purge_workflow_execution(logger, timestamp, purge_incomplete=False): +def purge_workflow_executions(logger, timestamp, purge_incomplete=False): """ Purge workflow execution output objects. @@ -52,7 +52,7 @@ def purge_workflow_execution(logger, timestamp, purge_incomplete=False): raise ValueError("Specify a valid timestamp to purge.") logger.info( - "Purging executions older than timestamp: %s" + "Purging workflow executions older than timestamp: %s" % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") ) @@ -76,13 +76,13 @@ def purge_workflow_execution(logger, timestamp, purge_incomplete=False): deleted_count = WorkflowExecution.delete_by_query(**exec_filters) except InvalidQueryError as e: msg = ( - "Bad query (%s) used to delete execution instances: %s" + "Bad query (%s) used to delete workflow execution instances: %s" "Please contact support." % (exec_filters, six.text_type(e)) ) raise InvalidQueryError(msg) except: logger.exception( - "Deletion of execution models failed for query with filters: %s.", + "Deletion of workflow execution models failed for query with filters: %s.", exec_filters, ) else: @@ -93,13 +93,18 @@ def purge_workflow_execution(logger, timestamp, purge_incomplete=False): ) if zombie_execution_instances > 0: - logger.error("Zombie execution instances left: %d.", zombie_execution_instances) + logger.error( + "Zombie workflow execution instances left: %d.", zombie_execution_instances + ) # Print stats - logger.info("All execution models older than timestamp %s were deleted.", timestamp) + logger.info( + "All workflow execution models older than timestamp %s were deleted.", + timestamp, + ) -def purge_task_execution(logger, timestamp, purge_incomplete=False): +def purge_task_executions(logger, timestamp, purge_incomplete=False): """ Purge task execution output objects. @@ -131,13 +136,13 @@ def purge_task_execution(logger, timestamp, purge_incomplete=False): deleted_count = TaskExecution.delete_by_query(**exec_filters) except InvalidQueryError as e: msg = ( - "Bad query (%s) used to delete execution instances: %s" + "Bad query (%s) used to delete task execution instances: %s" "Please contact support." % (exec_filters, six.text_type(e)) ) raise InvalidQueryError(msg) except: logger.exception( - "Deletion of execution models failed for query with filters: %s.", + "Deletion of task execution models failed for query with filters: %s.", exec_filters, ) else: @@ -148,7 +153,11 @@ def purge_task_execution(logger, timestamp, purge_incomplete=False): ) if zombie_execution_instances > 0: - logger.error("Zombie execution instances left: %d.", zombie_execution_instances) + logger.error( + "Zombie task execution instances left: %d.", zombie_execution_instances + ) # Print stats - logger.info("All execution models older than timestamp %s were deleted.", timestamp) + logger.info( + "All task execution models older than timestamp %s were deleted.", timestamp + ) diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py index e4c72495ee..a3a35196fd 100644 --- a/st2common/tests/unit/test_purge_task_executions.py +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -1,4 +1,4 @@ -# Copyright 2020 The StackStorm Authors. +# Copyright 2022 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ from datetime import timedelta from st2common import log as logging -from st2common.garbage_collection.workflows import purge_task_execution +from st2common.garbage_collection.workflows import purge_task_executions from st2common.models.db.workflow import TaskExecutionDB from st2common.persistence.workflow import TaskExecution from st2common.util import date as date_utils @@ -47,7 +47,7 @@ def test_no_timestamp_doesnt_delete(self): self.assertEqual(len(TaskExecution.get_all()), 1) expected_msg = "Specify a valid timestamp" self.assertRaisesRegexp( - ValueError, expected_msg, purge_task_execution, logger=LOG, timestamp=None + ValueError, expected_msg, purge_task_executions, logger=LOG, timestamp=None ) self.assertEqual(len(TaskExecution.get_all()), 1) @@ -76,7 +76,7 @@ def test_purge(self): TaskExecution.add_or_update(instance_db) self.assertEqual(len(TaskExecution.get_all()), 3) - purge_task_execution(logger=LOG, timestamp=now - timedelta(days=10)) + purge_task_executions(logger=LOG, timestamp=now - timedelta(days=10)) self.assertEqual(len(TaskExecution.get_all()), 2) def test_purge_incomplete(self): @@ -104,7 +104,7 @@ def test_purge_incomplete(self): TaskExecution.add_or_update(instance_db) self.assertEqual(len(TaskExecution.get_all()), 3) - purge_task_execution( + purge_task_executions( logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True ) self.assertEqual(len(TaskExecution.get_all()), 1) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py index c79147b9cb..713a0d1341 100644 --- a/st2common/tests/unit/test_purge_worklows.py +++ b/st2common/tests/unit/test_purge_worklows.py @@ -1,4 +1,4 @@ -# Copyright 2020 The StackStorm Authors. +# Copyright 2022 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ from datetime import timedelta from st2common import log as logging -from st2common.garbage_collection.workflows import purge_workflow_execution +from st2common.garbage_collection.workflows import purge_workflow_executions from st2common.models.db.workflow import WorkflowExecutionDB from st2common.persistence.workflow import WorkflowExecution from st2common.util import date as date_utils @@ -49,7 +49,7 @@ def test_no_timestamp_doesnt_delete(self): self.assertRaisesRegexp( ValueError, expected_msg, - purge_workflow_execution, + purge_workflow_executions, logger=LOG, timestamp=None, ) @@ -79,7 +79,7 @@ def test_purge(self): WorkflowExecution.add_or_update(instance_db) self.assertEqual(len(WorkflowExecution.get_all()), 3) - purge_workflow_execution(logger=LOG, timestamp=now - timedelta(days=10)) + purge_workflow_executions(logger=LOG, timestamp=now - timedelta(days=10)) self.assertEqual(len(WorkflowExecution.get_all()), 2) def test_purge_incomplete(self): @@ -106,7 +106,7 @@ def test_purge_incomplete(self): WorkflowExecution.add_or_update(instance_db) self.assertEqual(len(WorkflowExecution.get_all()), 3) - purge_workflow_execution( + purge_workflow_executions( logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True ) self.assertEqual(len(WorkflowExecution.get_all()), 1) diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index baba86734b..7ce22fd0fd 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -41,8 +41,8 @@ from st2common.garbage_collection.executions import purge_orphaned_workflow_executions from st2common.garbage_collection.inquiries import purge_inquiries from st2common.garbage_collection.workflows import ( - purge_workflow_execution, - purge_task_execution, + purge_workflow_executions, + purge_task_executions, ) from st2common.garbage_collection.trigger_instances import purge_trigger_instances from st2common.garbage_collection.trace import purge_traces @@ -260,7 +260,7 @@ def _perform_garbage_collection(self): else: LOG.debug(skip_message, obj_type) - obj_type = "task executions" + obj_type = "workflow task executions" if self._task_executions_ttl and self._task_executions_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_task_executions() @@ -311,8 +311,8 @@ def _purge_action_executions(self): def _purge_workflow_executions(self): """ - Purge workflow executions and corresponding live action, stdout and stderr object which match - the criteria defined in the config. + Purge workflow executions and corresponding live action, stdout and stderr + object which match the criteria defined in the config. """ utc_now = get_datetime_utc_now() timestamp = utc_now - datetime.timedelta(days=self._workflow_executions_ttl) @@ -329,7 +329,7 @@ def _purge_workflow_executions(self): assert timestamp < utc_now try: - purge_workflow_execution(logger=LOG, timestamp=timestamp) + purge_workflow_executions(logger=LOG, timestamp=timestamp) except Exception as e: LOG.exception( "Failed to delete workflow executions: %s" % (six.text_type(e)) @@ -339,8 +339,8 @@ def _purge_workflow_executions(self): def _purge_task_executions(self): """ - Purge task executions and corresponding live action, stdout and stderr object which match - the criteria defined in the config. + Purge workflow task executions and corresponding live action, stdout and stderr + object which match the criteria defined in the config. """ utc_now = get_datetime_utc_now() timestamp = utc_now - datetime.timedelta(days=self._task_executions_ttl) @@ -352,14 +352,16 @@ def _purge_task_executions(self): ) timestamp_str = isotime.format(dt=timestamp) - LOG.info("Deleting task executions older than: %s" % (timestamp_str)) + LOG.info("Deleting workflow task executions older than: %s" % (timestamp_str)) assert timestamp < utc_now try: - purge_task_execution(logger=LOG, timestamp=timestamp) + purge_task_executions(logger=LOG, timestamp=timestamp) except Exception as e: - LOG.exception("Failed to delete task executions: %s" % (six.text_type(e))) + LOG.exception( + "Failed to delete workflow task executions: %s" % (six.text_type(e)) + ) return True diff --git a/st2reactor/st2reactor/garbage_collector/config.py b/st2reactor/st2reactor/garbage_collector/config.py index 54327b9de2..9603ba8a7c 100644 --- a/st2reactor/st2reactor/garbage_collector/config.py +++ b/st2reactor/st2reactor/garbage_collector/config.py @@ -109,12 +109,16 @@ def _register_garbage_collector_opts(ignore_errors=False): cfg.IntOpt( "workflow_executions_ttl", default=None, - help="Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + help="Workflow execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", ), cfg.IntOpt( "task_executions_ttl", default=None, - help="Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + help="Workflow task execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", ), ] diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 3732f8c83d..61e4417414 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -497,12 +497,16 @@ def _register_garbage_collector_opts(): cfg.IntOpt( "workflow_executions_ttl", default=None, - help="Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + help="Workflow execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", ), cfg.IntOpt( "task_executions_ttl", default=None, - help="Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled).", + help="Workflow task execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", ), ] From a4786838636523d9ce33365eace14c76a1a0dc47 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 1 Apr 2022 09:36:27 -0500 Subject: [PATCH 13/13] reformat with black --- st2common/st2common/cmd/purge_task_executions.py | 2 +- st2common/st2common/cmd/purge_workflows.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py index 61f196484c..470a79861a 100644 --- a/st2common/st2common/cmd/purge_task_executions.py +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -55,7 +55,7 @@ def _register_cli_opts(): "purge-incomplete", default=False, help="Purge all models irrespective of their ``status``." - + 'By default, only workflow task executions in completed states such as ' + + "By default, only workflow task executions in completed states such as " + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', ), ] diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py index b7995db6db..c1f6725c59 100644 --- a/st2common/st2common/cmd/purge_workflows.py +++ b/st2common/st2common/cmd/purge_workflows.py @@ -55,7 +55,7 @@ def _register_cli_opts(): "purge-incomplete", default=False, help="Purge all models irrespective of their ``status``." - + 'By default, only workflow executions in completed states such as ' + + "By default, only workflow executions in completed states such as " + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', ), ]