From a9a172a9b90e7b2ef7e015578c659d601e5b225a Mon Sep 17 00:00:00 2001 From: Khushboo Date: Mon, 15 Nov 2021 15:26:32 +0530 Subject: [PATCH 1/6] Action runner graceful shutdown --- st2actions/st2actions/worker.py | 30 +++++ st2actions/tests/unit/test_worker.py | 194 +++++++++++++++++++++++++++ st2common/st2common/config.py | 22 +++ 3 files changed, 246 insertions(+) diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index 1741d60724..4597461802 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -17,6 +17,9 @@ import sys import traceback +from tooz.coordination import GroupNotCreated +from oslo_config import cfg + from st2actions.container.base import RunnerContainer from st2common import log as logging from st2common.constants import action as action_constants @@ -24,12 +27,14 @@ from st2common.exceptions.db import StackStormDBObjectNotFoundError from st2common.models.db.liveaction import LiveActionDB from st2common.persistence.execution import ActionExecution +from st2common.services import coordination from st2common.services import executions from st2common.services import workflows as wf_svc from st2common.transport.consumers import MessageHandler from st2common.transport.consumers import ActionsQueueConsumer from st2common.transport import utils as transport_utils from st2common.util import action_db as action_utils +from st2common.util import concurrency from st2common.util import system_info from st2common.transport import queues @@ -134,7 +139,32 @@ def process(self, liveaction): def shutdown(self): super(ActionExecutionDispatcher, self).shutdown() + + if cfg.CONF.actionrunner.graceful_shutdown: + + coordinator = coordination.get_coordinator() + member_ids = [] + service = "actionrunner" + exit_timeout = cfg.CONF.actionrunner.exit_timeout + sleep_delay = cfg.CONF.actionrunner.sleep_delay + timeout = 0 + + while timeout < exit_timeout and self._running_liveactions: + try: + member_ids = list( + coordinator.get_members(service.encode("utf-8")).get() + ) + except GroupNotCreated: + pass + + # Check if there are other runners in service registry + if not member_ids: + break + timeout += sleep_delay + concurrency.sleep(sleep_delay) + # Abandon running executions if incomplete + while self._running_liveactions: liveaction_id = self._running_liveactions.pop() try: diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index d8637b9ac7..8770b0d536 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -28,6 +28,7 @@ from st2common.persistence.execution import ActionExecution from st2common.persistence.liveaction import LiveAction from st2common.services import executions +from st2common.services import coordination from st2common.util import date as date_utils from st2common.bootstrap import runnersregistrar as runners_registrar from local_runner.local_shell_command_runner import LocalShellCommandRunner @@ -164,3 +165,196 @@ def test_worker_shutdown(self): # _run_action but will not result in KeyError because the discard method is used to # to remove the liveaction from _running_liveactions. runner_thread.wait() + + @mock.patch.object( + coordination.NoOpDriver, + "get_members", + mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")), + ) + def test_worker_graceful_shutdown_with_multiple_runners(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is succeeded. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_SUCCEEDED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() + + def test_worker_graceful_shutdown_with_single_runner(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is abandoned. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_ABANDONED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() + + @mock.patch.object( + coordination.NoOpDriver, + "get_members", + mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")), + ) + def test_worker_graceful_shutdown_exit_timeout(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=True, group="actionrunner" + ) + cfg.CONF.set_override(name="exit_timeout", override=5, group="actionrunner") + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker to trigger the abandon process. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + # Continue the excution for 5+ seconds to ensure timeout occurs. + eventlet.sleep(6) + + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is abandoned. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_ABANDONED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() + shutdown_thread.kill() diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 0c002a8e2f..b34d15fd40 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -500,6 +500,28 @@ def register_opts(ignore_errors=False): dispatcher_pool_opts, group="actionrunner", ignore_errors=ignore_errors ) + graceful_shutdown_opts = [ + cfg.BoolOpt( + "graceful_shutdown", + default=False, + help="This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.", + ), + cfg.IntOpt( + "exit_timeout", + default=300, + help="How long to wait for process (in seconds) to exit after receiving shutdown signal.", + ), + cfg.IntOpt( + "sleep_delay", + default=2, + help="Time interval between subsequent queries to check running executions.", + ), + ] + + do_register_opts( + graceful_shutdown_opts, group="actionrunner", ignore_errors=ignore_errors + ) + ssh_runner_opts = [ cfg.StrOpt( "remote_dir", From 4141812a2938af9f8c5cec4c74ad94f2c1e55a6d Mon Sep 17 00:00:00 2001 From: Khushboo Date: Mon, 15 Nov 2021 15:51:30 +0530 Subject: [PATCH 2/6] Fix LINT errors --- conf/st2.conf.sample | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 62e1e00f6d..54d3b67a70 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -10,6 +10,10 @@ enable = True [actionrunner] # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 +# How long to wait for process (in seconds) to exit after receiving shutdown signal. +exit_timeout = 300 +# This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout. +graceful_shutdown = False # location of the logging.conf file logging = /etc/st2/logging.actionrunner.conf # List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment. @@ -18,6 +22,8 @@ pip_opts = # comma separated list allowed here. python_binary = /usr/bin/python # Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter. python_runner_log_level = DEBUG +# Time interval between subsequent queries to check running executions. +sleep_delay = 2 # True to store and stream action output (stdout and stderr) in real-time. stream_output = True # Buffer size to use for real time action output streaming. 0 means unbuffered 1 means line buffered, -1 means system default, which usually means fully buffered and any other positive value means use a buffer of (approximately) that size From f96cf6c92a903fb7222889abb734c55aa5392a6b Mon Sep 17 00:00:00 2001 From: Khushboo Date: Mon, 15 Nov 2021 16:12:17 +0530 Subject: [PATCH 3/6] Fix failing unit test --- st2actions/tests/unit/test_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index 8770b0d536..cc51bf556a 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -266,6 +266,8 @@ def test_worker_graceful_shutdown_with_single_runner(self): # Shutdown the worker to trigger the abandon process. shutdown_thread = eventlet.spawn(action_worker.shutdown) + # Wait for action runner shutdown sequence to complete + eventlet.sleep(5) # Make sure the temporary file has been deleted. self.assertFalse(os.path.isfile(temp_file)) From 507feb5a35c69196b612c3f1a729a4c81e0e7ccd Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 13 Jan 2022 15:58:13 +0530 Subject: [PATCH 4/6] Review comments --- conf/st2.conf.sample | 9 +++------ st2actions/st2actions/worker.py | 4 ++-- st2actions/tests/unit/test_worker.py | 7 ++++++- st2common/st2common/config.py | 6 +++--- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 54d3b67a70..59eebc51c0 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -1,6 +1,3 @@ -# Sample config which contains all the available options which the corresponding descriptions -# Note: This file is automatically generated using tools/config_gen.py - DO NOT UPDATE MANUALLY - [action_sensor] # List of execution statuses for which a trigger will be emitted. emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list allowed here. @@ -11,9 +8,9 @@ enable = True # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 # How long to wait for process (in seconds) to exit after receiving shutdown signal. -exit_timeout = 300 +exit_still_active_check = 300 # This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout. -graceful_shutdown = False +graceful_shutdown = True # location of the logging.conf file logging = /etc/st2/logging.actionrunner.conf # List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment. @@ -23,7 +20,7 @@ python_binary = /usr/bin/python # Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter. python_runner_log_level = DEBUG # Time interval between subsequent queries to check running executions. -sleep_delay = 2 +still_active_check_interval = 2 # True to store and stream action output (stdout and stderr) in real-time. stream_output = True # Buffer size to use for real time action output streaming. 0 means unbuffered 1 means line buffered, -1 means system default, which usually means fully buffered and any other positive value means use a buffer of (approximately) that size diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index 4597461802..30af0d56a7 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -145,8 +145,8 @@ def shutdown(self): coordinator = coordination.get_coordinator() member_ids = [] service = "actionrunner" - exit_timeout = cfg.CONF.actionrunner.exit_timeout - sleep_delay = cfg.CONF.actionrunner.sleep_delay + exit_timeout = cfg.CONF.actionrunner.exit_still_active_check + sleep_delay = cfg.CONF.actionrunner.still_active_check_interval timeout = 0 while timeout < exit_timeout and self._running_liveactions: diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index cc51bf556a..0cf4d730f7 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -117,6 +117,9 @@ def test_non_utf8_action_result_string(self): ) def test_worker_shutdown(self): + cfg.CONF.set_override( + name="graceful_shutdown", override=False, group="actionrunner" + ) action_worker = actions_worker.get_worker() temp_file = None @@ -302,7 +305,9 @@ def test_worker_graceful_shutdown_exit_timeout(self): cfg.CONF.set_override( name="graceful_shutdown", override=True, group="actionrunner" ) - cfg.CONF.set_override(name="exit_timeout", override=5, group="actionrunner") + cfg.CONF.set_override( + name="exit_still_active_check", override=5, group="actionrunner" + ) action_worker = actions_worker.get_worker() temp_file = None diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index b34d15fd40..8ad2f5d1e8 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -503,16 +503,16 @@ def register_opts(ignore_errors=False): graceful_shutdown_opts = [ cfg.BoolOpt( "graceful_shutdown", - default=False, + default=True, help="This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.", ), cfg.IntOpt( - "exit_timeout", + "exit_still_active_check", default=300, help="How long to wait for process (in seconds) to exit after receiving shutdown signal.", ), cfg.IntOpt( - "sleep_delay", + "still_active_check_interval", default=2, help="Time interval between subsequent queries to check running executions.", ), From 4a77bf8f9394263ecf346ff10a8e659873dd5b92 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 13 Jan 2022 16:06:01 +0530 Subject: [PATCH 5/6] configgen --- conf/st2.conf.sample | 3 +++ 1 file changed, 3 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 59eebc51c0..9009cd0199 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -1,3 +1,6 @@ +# Sample config which contains all the available options which the corresponding descriptions +# Note: This file is automatically generated using tools/config_gen.py - DO NOT UPDATE MANUALLY + [action_sensor] # List of execution statuses for which a trigger will be emitted. emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list allowed here. From e3da36bda81cf6dffc26c41d878e9754fb206ec9 Mon Sep 17 00:00:00 2001 From: Khushboo <47312983+khushboobhatia01@users.noreply.github.com> Date: Mon, 17 Jan 2022 11:24:27 +0530 Subject: [PATCH 6/6] Update CHANGELOG.rst --- CHANGELOG.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 74f9616831..e033fee5d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -50,6 +50,10 @@ Added Contributed by @ktyogurt +* Implemented graceful shutdown for action runner. Enabled ``graceful_shutdown`` in ``st2.conf`` file. #5428 + + Contributed by @khushboobhatia01 + Fixed ~~~~~