diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 80bc94671e..83e57db146 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -56,6 +56,10 @@ Added Contributed by @ktyogurt +* Implemented graceful shutdown for action runner. Enabled ``graceful_shutdown`` in ``st2.conf`` file. #5428 + + Contributed by @khushboobhatia01 + Fixed ~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 62e1e00f6d..9009cd0199 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -10,6 +10,10 @@ enable = True [actionrunner] # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 +# How long to wait for process (in seconds) to exit after receiving shutdown signal. +exit_still_active_check = 300 +# This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout. +graceful_shutdown = True # location of the logging.conf file logging = /etc/st2/logging.actionrunner.conf # List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment. @@ -18,6 +22,8 @@ pip_opts = # comma separated list allowed here. python_binary = /usr/bin/python # Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter. python_runner_log_level = DEBUG +# Time interval between subsequent queries to check running executions. +still_active_check_interval = 2 # True to store and stream action output (stdout and stderr) in real-time. stream_output = True # Buffer size to use for real time action output streaming. 0 means unbuffered 1 means line buffered, -1 means system default, which usually means fully buffered and any other positive value means use a buffer of (approximately) that size diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index 1741d60724..30af0d56a7 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -17,6 +17,9 @@ import sys import traceback +from tooz.coordination import GroupNotCreated +from oslo_config import cfg + from st2actions.container.base import RunnerContainer from st2common import log as logging from st2common.constants import action as action_constants @@ -24,12 +27,14 @@ from st2common.exceptions.db import StackStormDBObjectNotFoundError from st2common.models.db.liveaction import LiveActionDB from st2common.persistence.execution import ActionExecution +from st2common.services import coordination from st2common.services import executions from st2common.services import workflows as wf_svc from st2common.transport.consumers import MessageHandler from st2common.transport.consumers import ActionsQueueConsumer from st2common.transport import utils as transport_utils from st2common.util import action_db as action_utils +from st2common.util import concurrency from st2common.util import system_info from st2common.transport import queues @@ -134,7 +139,32 @@ def process(self, liveaction): def shutdown(self): super(ActionExecutionDispatcher, self).shutdown() + + if cfg.CONF.actionrunner.graceful_shutdown: + + coordinator = coordination.get_coordinator() + member_ids = [] + service = "actionrunner" + exit_timeout = cfg.CONF.actionrunner.exit_still_active_check + sleep_delay = cfg.CONF.actionrunner.still_active_check_interval + timeout = 0 + + while timeout < exit_timeout and self._running_liveactions: + try: + member_ids = list( + coordinator.get_members(service.encode("utf-8")).get() + ) + except GroupNotCreated: + pass + + # Check if there are other runners in service registry + if not member_ids: + break + timeout += sleep_delay + concurrency.sleep(sleep_delay) + # Abandon running executions if incomplete + while self._running_liveactions: liveaction_id = self._running_liveactions.pop() try: diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index d8637b9ac7..0cf4d730f7 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -28,6 +28,7 @@ from st2common.persistence.execution import ActionExecution from st2common.persistence.liveaction import LiveAction from st2common.services import executions +from st2common.services import coordination from st2common.util import date as date_utils from st2common.bootstrap import runnersregistrar as runners_registrar from local_runner.local_shell_command_runner import LocalShellCommandRunner @@ -116,6 +117,9 @@ def test_non_utf8_action_result_string(self): ) def test_worker_shutdown(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=False, group="actionrunner" + ) action_worker = actions_worker.get_worker() temp_file = None @@ -164,3 +168,200 @@ def test_worker_shutdown(self): # _run_action but will not result in KeyError because the discard method is used to # to remove the liveaction from _running_liveactions. runner_thread.wait() + + @mock.patch.object( + coordination.NoOpDriver, + "get_members", + mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")), + ) + def test_worker_graceful_shutdown_with_multiple_runners(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is succeeded. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_SUCCEEDED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() + + def test_worker_graceful_shutdown_with_single_runner(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + # Wait for action runner shutdown sequence to complete + eventlet.sleep(5) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is abandoned. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_ABANDONED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() + + @mock.patch.object( + coordination.NoOpDriver, + "get_members", + mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")), + ) + def test_worker_graceful_shutdown_exit_timeout(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + cfg.CONF.set_override( + name="exit_still_active_check", override=5, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + # Continue the excution for 5+ seconds to ensure timeout occurs. + eventlet.sleep(6) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is abandoned. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_ABANDONED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 0c002a8e2f..8ad2f5d1e8 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -500,6 +500,28 @@ def register_opts(ignore_errors=False): dispatcher_pool_opts, group="actionrunner", ignore_errors=ignore_errors ) + graceful_shutdown_opts = [ + cfg.BoolOpt( + "graceful_shutdown", + default=True, + help="This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.", + ), + cfg.IntOpt( + "exit_still_active_check", + default=300, + help="How long to wait for process (in seconds) to exit after receiving shutdown signal.", + ), + cfg.IntOpt( + "still_active_check_interval", + default=2, + help="Time interval between subsequent queries to check running executions.", + ), + ] + + do_register_opts( + graceful_shutdown_opts, group="actionrunner", ignore_errors=ignore_errors + ) + ssh_runner_opts = [ cfg.StrOpt( "remote_dir",