diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c1091ef86a..c4c9856e00 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -155,6 +155,9 @@ Added * Added garbage collection for rule_enforcement and trace models #5596/5602 Contributed by Amanda McGuinness (@amanda11 intive) +* Added garbage collection for workflow execution and task execution objects #4924 + Contributed by @srimandaleeka01 and @amanda11 + Fixed ~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 01f5387e71..bb6bbd8d42 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -181,10 +181,14 @@ purge_inquiries = False rule_enforcements_ttl = None # How long to wait / sleep (in seconds) between collection of different object types. sleep_delay = 2 +# Workflow task execution output objects (generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +task_executions_ttl = None # Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled). traces_ttl = None # Trigger instances older than this value (days) will be automatically deleted. Defaults to None (disabled). trigger_instances_ttl = None +# Workflow execution output objects (generated by action output streaming) older than this value (days) will be automatically deleted. Defaults to None (disabled). +workflow_executions_ttl = None [keyvalue] # Allow encryption of values in key value stored qualified as "secret". diff --git a/st2common/bin/st2-purge-task-executions b/st2common/bin/st2-purge-task-executions new file mode 100644 index 0000000000..bff72f36b6 --- /dev/null +++ b/st2common/bin/st2-purge-task-executions @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_task_executions import main + +if __name__ == '__main__': + sys.exit(main()) diff --git a/st2common/bin/st2-purge-workflows b/st2common/bin/st2-purge-workflows new file mode 100644 index 0000000000..af90749993 --- /dev/null +++ b/st2common/bin/st2-purge-workflows @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_workflows import main + +if __name__ == '__main__': + sys.exit(main()) diff --git a/st2common/setup.py b/st2common/setup.py index 66f6324264..aa10d41dec 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -52,6 +52,8 @@ "bin/st2-cleanup-db", "bin/st2-register-content", "bin/st2-purge-executions", + "bin/st2-purge-workflows", + "bin/st2-purge-task-executions", "bin/st2-purge-trigger-instances", "bin/st2-purge-traces", "bin/st2-purge-rule-enforcements", diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py new file mode 100644 index 0000000000..470a79861a --- /dev/null +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -0,0 +1,90 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 workflow task executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_task_executions + +__all__ = ["main"] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt( + "timestamp", + default=None, + help="Will delete workflow task execution objects older than " + + "this UTC timestamp. " + + "Example value: 2015-03-13T19:01:27.255542Z.", + ), + cfg.BoolOpt( + "purge-incomplete", + default=False, + help="Purge all models irrespective of their ``status``." + + "By default, only workflow task executions in completed states such as " + + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', + ), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error("Please supply a timestamp for purging models. Aborting.") + return 1 + else: + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_task_executions( + logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete + ) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py new file mode 100644 index 0000000000..c1f6725c59 --- /dev/null +++ b/st2common/st2common/cmd/purge_workflows.py @@ -0,0 +1,90 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 workflow executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_workflow_executions + +__all__ = ["main"] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt( + "timestamp", + default=None, + help="Will delete workflow execution objects older than " + + "this UTC timestamp. " + + "Example value: 2015-03-13T19:01:27.255542Z.", + ), + cfg.BoolOpt( + "purge-incomplete", + default=False, + help="Purge all models irrespective of their ``status``." + + "By default, only workflow executions in completed states such as " + + '"succeeeded", "failed", "canceled" and "timed_out" are deleted.', + ), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error("Please supply a timestamp for purging models. Aborting.") + return 1 + else: + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_workflow_executions( + logger=LOG, timestamp=timestamp, purge_incomplete=purge_incomplete + ) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py new file mode 100644 index 0000000000..d815124353 --- /dev/null +++ b/st2common/st2common/garbage_collection/workflows.py @@ -0,0 +1,163 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module with utility functions for purging old workflow executions. +""" +from __future__ import absolute_import + +import copy + +import six +from mongoengine.errors import InvalidQueryError + +from st2common.constants import action as action_constants +from st2common.persistence.workflow import WorkflowExecution +from st2common.persistence.workflow import TaskExecution + + +__all__ = ["purge_workflow_executions", "purge_task_executions"] + +# TODO: Are these valid too.. +DONE_STATES = [ + action_constants.LIVEACTION_STATUS_SUCCEEDED, + action_constants.LIVEACTION_STATUS_FAILED, + action_constants.LIVEACTION_STATUS_TIMED_OUT, + action_constants.LIVEACTION_STATUS_CANCELED, +] + + +def purge_workflow_executions(logger, timestamp, purge_incomplete=False): + """ + Purge workflow execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError("Specify a valid timestamp to purge.") + + logger.info( + "Purging workflow executions older than timestamp: %s" + % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + ) + + filters = {} + + if purge_incomplete: + filters["start_timestamp__lt"] = timestamp + else: + filters["end_timestamp__lt"] = timestamp + filters["start_timestamp__lt"] = timestamp + filters["status"] = {"$in": DONE_STATES} + + exec_filters = copy.copy(filters) + + # 1. Delete Workflow Execution objects + try: + # Note: We call list() on the query set object because it's lazyily evaluated otherwise + # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], + # no_dereference=True, + # **exec_filters)) + deleted_count = WorkflowExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ( + "Bad query (%s) used to delete workflow execution instances: %s" + "Please contact support." % (exec_filters, six.text_type(e)) + ) + raise InvalidQueryError(msg) + except: + logger.exception( + "Deletion of workflow execution models failed for query with filters: %s.", + exec_filters, + ) + else: + logger.info("Deleted %s workflow execution objects" % deleted_count) + + zombie_execution_instances = len( + WorkflowExecution.query(only_fields=["id"], no_dereference=True, **exec_filters) + ) + + if zombie_execution_instances > 0: + logger.error( + "Zombie workflow execution instances left: %d.", zombie_execution_instances + ) + + # Print stats + logger.info( + "All workflow execution models older than timestamp %s were deleted.", + timestamp, + ) + + +def purge_task_executions(logger, timestamp, purge_incomplete=False): + """ + Purge task execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError("Specify a valid timestamp to purge.") + + logger.info( + "Purging executions older than timestamp: %s" + % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + ) + + filters = {} + + if purge_incomplete: + filters["start_timestamp__lt"] = timestamp + else: + filters["end_timestamp__lt"] = timestamp + filters["start_timestamp__lt"] = timestamp + filters["status"] = {"$in": DONE_STATES} + + exec_filters = copy.copy(filters) + try: + deleted_count = TaskExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ( + "Bad query (%s) used to delete task execution instances: %s" + "Please contact support." % (exec_filters, six.text_type(e)) + ) + raise InvalidQueryError(msg) + except: + logger.exception( + "Deletion of task execution models failed for query with filters: %s.", + exec_filters, + ) + else: + logger.info("Deleted %s task execution objects" % deleted_count) + + zombie_execution_instances = len( + TaskExecution.query(only_fields=["id"], no_dereference=True, **exec_filters) + ) + + if zombie_execution_instances > 0: + logger.error( + "Zombie task execution instances left: %d.", zombie_execution_instances + ) + + # Print stats + logger.info( + "All task execution models older than timestamp %s were deleted.", timestamp + ) diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 5e25bf4a19..d20afb78b2 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -737,6 +737,19 @@ def save(self, instance, validate=True): return self._undo_dict_field_escape(instance) + def delete(self, instance): + return instance.delete() + + def delete_by_query(self, *args, **query): + """ + Delete objects by query and return number of deleted objects. + """ + qs = self.model.objects.filter(*args, **query) + count = qs.delete() + log_query_and_profile_data_for_queryset(queryset=qs) + + return count + def get_host_names_for_uri_dict(uri_dict): hosts = [] diff --git a/st2common/st2common/persistence/workflow.py b/st2common/st2common/persistence/workflow.py index 8d993ef4fe..49468bd9ef 100644 --- a/st2common/st2common/persistence/workflow.py +++ b/st2common/st2common/persistence/workflow.py @@ -39,6 +39,10 @@ def _get_publisher(cls): return cls.publisher + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) + class TaskExecution(persistence.StatusBasedResource): impl = db.ChangeRevisionMongoDBAccess(wf_db_models.TaskExecutionDB) @@ -47,3 +51,7 @@ class TaskExecution(persistence.StatusBasedResource): @classmethod def _get_impl(cls): return cls.impl + + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py new file mode 100644 index 0000000000..a3a35196fd --- /dev/null +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -0,0 +1,110 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.garbage_collection.workflows import purge_task_executions +from st2common.models.db.workflow import TaskExecutionDB +from st2common.persistence.workflow import TaskExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeTaskExecutionInstances(CleanDbTestCase): + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeTaskExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeTaskExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="succeeded", + ) + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 1) + expected_msg = "Specify a valid timestamp" + self.assertRaisesRegexp( + ValueError, expected_msg, purge_task_executions, logger=LOG, timestamp=None + ) + self.assertEqual(len(TaskExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="failed", + ) + TaskExecution.add_or_update(instance_db) + + # Addn incomplete + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="canceled", + ) + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 3) + purge_task_executions(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(TaskExecution.get_all()), 2) + + def test_purge_incomplete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="failed", + ) + TaskExecution.add_or_update(instance_db) + + # Addn incomplete + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="canceled", + ) + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 3) + purge_task_executions( + logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True + ) + self.assertEqual(len(TaskExecution.get_all()), 1) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py new file mode 100644 index 0000000000..713a0d1341 --- /dev/null +++ b/st2common/tests/unit/test_purge_worklows.py @@ -0,0 +1,112 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.garbage_collection.workflows import purge_workflow_executions +from st2common.models.db.workflow import WorkflowExecutionDB +from st2common.persistence.workflow import WorkflowExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeWorkflowExecutionInstances(CleanDbTestCase): + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeWorkflowExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeWorkflowExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="running", + ) + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 1) + expected_msg = "Specify a valid timestamp" + self.assertRaisesRegexp( + ValueError, + expected_msg, + purge_workflow_executions, + logger=LOG, + timestamp=None, + ) + self.assertEqual(len(WorkflowExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="failed", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="succeeded", + ) + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 3) + purge_workflow_executions(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(WorkflowExecution.get_all()), 2) + + def test_purge_incomplete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + end_timestamp=now - timedelta(days=20), + status="cancelled", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=20), + status="running", + ) + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB( + start_timestamp=now - timedelta(days=5), + end_timestamp=now - timedelta(days=5), + status="succeeded", + ) + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 3) + purge_workflow_executions( + logger=LOG, timestamp=now - timedelta(days=10), purge_incomplete=True + ) + self.assertEqual(len(WorkflowExecution.get_all()), 1) diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index 9fa606825c..7ce22fd0fd 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -40,6 +40,10 @@ from st2common.garbage_collection.executions import purge_execution_output_objects from st2common.garbage_collection.executions import purge_orphaned_workflow_executions from st2common.garbage_collection.inquiries import purge_inquiries +from st2common.garbage_collection.workflows import ( + purge_workflow_executions, + purge_task_executions, +) from st2common.garbage_collection.trigger_instances import purge_trigger_instances from st2common.garbage_collection.trace import purge_traces from st2common.garbage_collection.rule_enforcement import purge_rule_enforcements @@ -75,6 +79,10 @@ def __init__( self._rule_enforcements_ttl = cfg.CONF.garbagecollector.rule_enforcements_ttl self._purge_inquiries = cfg.CONF.garbagecollector.purge_inquiries self._workflow_execution_max_idle = cfg.CONF.workflow_engine.gc_max_idle_sec + self._workflow_executions_ttl = ( + cfg.CONF.garbagecollector.workflow_executions_ttl + ) + self._task_executions_ttl = cfg.CONF.garbagecollector.task_executions_ttl self._validate_ttl_values() @@ -252,6 +260,25 @@ def _perform_garbage_collection(self): else: LOG.debug(skip_message, obj_type) + obj_type = "workflow task executions" + if self._task_executions_ttl and self._task_executions_ttl >= MINIMUM_TTL_DAYS: + LOG.info(proc_message, obj_type) + self._purge_task_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + + obj_type = "workflow executions" + if ( + self._workflow_executions_ttl + and self._workflow_executions_ttl >= MINIMUM_TTL_DAYS + ): + LOG.info(proc_message, obj_type) + self._purge_workflow_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + def _purge_action_executions(self): """ Purge action executions and corresponding live action, stdout and stderr object which match @@ -282,6 +309,62 @@ def _purge_action_executions(self): return True + def _purge_workflow_executions(self): + """ + Purge workflow executions and corresponding live action, stdout and stderr + object which match the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = utc_now - datetime.timedelta(days=self._workflow_executions_ttl) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError( + "Calculated timestamp would violate the minimum TTL constraint" + ) + + timestamp_str = isotime.format(dt=timestamp) + LOG.info("Deleting workflow executions older than: %s" % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_workflow_executions(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception( + "Failed to delete workflow executions: %s" % (six.text_type(e)) + ) + + return True + + def _purge_task_executions(self): + """ + Purge workflow task executions and corresponding live action, stdout and stderr + object which match the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = utc_now - datetime.timedelta(days=self._task_executions_ttl) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError( + "Calculated timestamp would violate the minimum TTL constraint" + ) + + timestamp_str = isotime.format(dt=timestamp) + LOG.info("Deleting workflow task executions older than: %s" % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_task_executions(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception( + "Failed to delete workflow task executions: %s" % (six.text_type(e)) + ) + + return True + def _purge_action_executions_output(self): utc_now = get_datetime_utc_now() timestamp = utc_now - datetime.timedelta( diff --git a/st2reactor/st2reactor/garbage_collector/config.py b/st2reactor/st2reactor/garbage_collector/config.py index c70074939d..9603ba8a7c 100644 --- a/st2reactor/st2reactor/garbage_collector/config.py +++ b/st2reactor/st2reactor/garbage_collector/config.py @@ -106,6 +106,20 @@ def _register_garbage_collector_opts(ignore_errors=False): default=None, help="Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled).", ), + cfg.IntOpt( + "workflow_executions_ttl", + default=None, + help="Workflow execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", + ), + cfg.IntOpt( + "task_executions_ttl", + default=None, + help="Workflow task execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", + ), ] common_config.do_register_opts( diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 1d34159d8c..61e4417414 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -494,6 +494,20 @@ def _register_garbage_collector_opts(): default=None, help="Trace objects older than this value (days) will be automatically deleted. Defaults to None (disabled).", ), + cfg.IntOpt( + "workflow_executions_ttl", + default=None, + help="Workflow execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", + ), + cfg.IntOpt( + "task_executions_ttl", + default=None, + help="Workflow task execution output objects (generated by action output " + "streaming) older than this value (days) will be automatically deleted. " + "Defaults to None (disabled).", + ), ] _register_opts(ttl_opts, group="garbagecollector")