diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f3f3d768e3..5744d5cd31 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -37,6 +37,15 @@ Changed Contributed by @khushboobhatia01 +* Add new ``abandon_wait_period`` config option which will wait for grace period seconds after + which actionrunner starts abandoning incomplete executions during shutdown. + + Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). + + Set this config if you want short running executions to complete before actionrunner starts abandoning them. + + Contributed by @khushboobhatia01 + 3.5.0 - June 23, 2021 --------------------- diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 62e1e00f6d..50b9da9f68 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -8,6 +8,8 @@ emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list a enable = True [actionrunner] +# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown.Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). +abandon_wait_period = 0 # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 # location of the logging.conf file diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index 1741d60724..39d7ba9331 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -17,6 +17,8 @@ import sys import traceback +from oslo_config import cfg + from st2actions.container.base import RunnerContainer from st2common import log as logging from st2common.constants import action as action_constants @@ -30,6 +32,7 @@ from st2common.transport.consumers import ActionsQueueConsumer from st2common.transport import utils as transport_utils from st2common.util import action_db as action_utils +from st2common.util import concurrency from st2common.util import system_info from st2common.transport import queues @@ -134,6 +137,14 @@ def process(self, liveaction): def shutdown(self): super(ActionExecutionDispatcher, self).shutdown() + abandon_wait_period = cfg.CONF.actionrunner.abandon_wait_period + if abandon_wait_period: + LOG.debug( + "Sleeping for %s seconds before starting to abandon incomplete executions.", + abandon_wait_period, + ) + concurrency.sleep(abandon_wait_period) + # Abandon running executions if incomplete while self._running_liveactions: liveaction_id = self._running_liveactions.pop() diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index d8637b9ac7..a8e7bf1780 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -164,3 +164,63 @@ def test_worker_shutdown(self): # _run_action but will not result in KeyError because the discard method is used to # to remove the liveaction from _running_liveactions. runner_thread.wait() + + def test_worker_shutdown_with_abandon_wait_period(self): + cfg.CONF.set_override( + name="abandon_wait_period", override=30, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker asynchronously. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is succeeded. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_SUCCEEDED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index cf2ada4ee3..b5006fa4e2 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -477,6 +477,14 @@ def register_opts(ignore_errors=False): "that size" ), ), + cfg.IntOpt( + "abandon_wait_period", + default=0, + help=( + "Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown." + "Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0)." + ), + ), ] do_register_opts(