From 13adf7cac3a8bfc60024cc06f763af91c45878b8 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 26 Aug 2021 12:32:54 +0530 Subject: [PATCH 1/7] Add wait_period before abandoning executions during shutdown --- conf/st2.conf.sample | 2 + st2actions/st2actions/worker.py | 10 +++++ st2actions/tests/unit/test_worker.py | 60 ++++++++++++++++++++++++++++ st2common/st2common/config.py | 7 ++++ 4 files changed, 79 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 62e1e00f6d..08c6d5e4ca 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -28,6 +28,8 @@ virtualenv_binary = /usr/bin/virtualenv virtualenv_opts = --system-site-packages # comma separated list allowed here. # Internal pool size for dispatcher used by workflow actions. workflows_pool_size = 40 +# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown. +abandon_wait_period = 0 [api] # List of origins allowed for api, auth and stream diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index 1741d60724..ed9f7c5a06 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -17,6 +17,8 @@ import sys import traceback +from oslo_config import cfg + from st2actions.container.base import RunnerContainer from st2common import log as logging from st2common.constants import action as action_constants @@ -30,6 +32,7 @@ from st2common.transport.consumers import ActionsQueueConsumer from st2common.transport import utils as transport_utils from st2common.util import action_db as action_utils +from st2common.util import concurrency from st2common.util import system_info from st2common.transport import queues @@ -134,6 +137,13 @@ def process(self, liveaction): def shutdown(self): super(ActionExecutionDispatcher, self).shutdown() + abandon_wait_period = cfg.CONF.actionrunner.abandon_wait_period + LOG.debug( + "Sleeping for %s seconds before starting to abandon incomplete executions.", + abandon_wait_period, + ) + concurrency.sleep(abandon_wait_period) + # Abandon running executions if incomplete while self._running_liveactions: liveaction_id = self._running_liveactions.pop() diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index d8637b9ac7..0d3876e0f5 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -164,3 +164,63 @@ def test_worker_shutdown(self): # _run_action but will not result in KeyError because the discard method is used to # to remove the liveaction from _running_liveactions. runner_thread.wait() + + def test_worker_shutdown_with_abandon_wait_period(self): + cfg.CONF.set_override( + name="abandon_wait_period", override=30, group="actionrunner" + ) + action_worker = actions_worker.get_worker() + temp_file = None + + # Create a temporary file that is deleted when the file is closed and then set up an + # action to wait for this file to be deleted. This allows this test to run the action + # over a separate thread, run the shutdown sequence on the main thread, and then let + # the local runner to exit gracefully and allow _run_action to finish execution. + fp = tempfile.NamedTemporaryFile() + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) + + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break + + self.assertEqual(len(action_worker._running_liveactions), 1) + + # Shutdown the worker asynchronously. + shutdown_thread = eventlet.spawn(action_worker.shutdown) + + fp.close() + # Make sure the temporary file has been deleted. + self.assertFalse(os.path.isfile(temp_file)) + + # Wait for the worker up to 10s to remove the liveaction from _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) < 1: + break + liveaction_db = LiveAction.get_by_id(liveaction_db.id) + + # Verify that _running_liveactions is empty and the liveaction is succeeded. + self.assertEqual(len(action_worker._running_liveactions), 0) + self.assertEqual( + liveaction_db.status, + action_constants.LIVEACTION_STATUS_SUCCEEDED, + str(liveaction_db), + ) + + # Wait for the local runner to complete. This will activate the finally block in + # _run_action but will not result in KeyError because the discard method is used to + # to remove the liveaction from _running_liveactions. + runner_thread.wait() diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index cf2ada4ee3..f4c4848dc9 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -477,6 +477,13 @@ def register_opts(ignore_errors=False): "that size" ), ), + cfg.IntOpt( + "abandon_wait_period", + default=0, + help=( + "Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown." + ), + ), ] do_register_opts( From c3beb4a2dd0b7e6562ff9880a94f436ed34d9007 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 6 Sep 2021 15:02:57 +0530 Subject: [PATCH 2/7] Review comments --- conf/st2.conf.sample | 2 +- st2actions/st2actions/worker.py | 11 ++++++----- st2common/st2common/config.py | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 08c6d5e4ca..58e9ebb415 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -28,7 +28,7 @@ virtualenv_binary = /usr/bin/virtualenv virtualenv_opts = --system-site-packages # comma separated list allowed here. # Internal pool size for dispatcher used by workflow actions. workflows_pool_size = 40 -# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown. +# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown. Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). abandon_wait_period = 0 [api] diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index ed9f7c5a06..39d7ba9331 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -138,11 +138,12 @@ def process(self, liveaction): def shutdown(self): super(ActionExecutionDispatcher, self).shutdown() abandon_wait_period = cfg.CONF.actionrunner.abandon_wait_period - LOG.debug( - "Sleeping for %s seconds before starting to abandon incomplete executions.", - abandon_wait_period, - ) - concurrency.sleep(abandon_wait_period) + if abandon_wait_period: + LOG.debug( + "Sleeping for %s seconds before starting to abandon incomplete executions.", + abandon_wait_period, + ) + concurrency.sleep(abandon_wait_period) # Abandon running executions if incomplete while self._running_liveactions: diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index f4c4848dc9..b5006fa4e2 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -482,6 +482,7 @@ def register_opts(ignore_errors=False): default=0, help=( "Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown." + "Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0)." ), ), ] From ea7c7b2b0a226dbb36503d3e9caf95b21ef18066 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 6 Sep 2021 17:39:21 +0530 Subject: [PATCH 3/7] Review comments --- CHANGELOG.rst | 9 ++++++ conf/st2.conf.sample | 2 +- st2actions/tests/unit/test_worker.py | 42 ++++++++++++++-------------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f3f3d768e3..c35b83c140 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -37,6 +37,15 @@ Changed Contributed by @khushboobhatia01 +* Add new ``abandon_wait_period`` config option which will wait for grace period seconds after + which actionrunner starts abandoning incomplete executions during shutdown. + + Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). + + Set this config if you want short running executions to complete before actionrunner starts abandoning them. + + Contributed by @khushboobhatia01 + 3.5.0 - June 23, 2021 --------------------- diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 58e9ebb415..47e26e476b 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -29,7 +29,7 @@ virtualenv_opts = --system-site-packages # comma separated list allowed here. # Internal pool size for dispatcher used by workflow actions. workflows_pool_size = 40 # Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown. Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). -abandon_wait_period = 0 +abandon_wait_period = [api] # List of origins allowed for api, auth and stream diff --git a/st2actions/tests/unit/test_worker.py b/st2actions/tests/unit/test_worker.py index 0d3876e0f5..a8e7bf1780 100644 --- a/st2actions/tests/unit/test_worker.py +++ b/st2actions/tests/unit/test_worker.py @@ -176,32 +176,31 @@ def test_worker_shutdown_with_abandon_wait_period(self): # action to wait for this file to be deleted. This allows this test to run the action # over a separate thread, run the shutdown sequence on the main thread, and then let # the local runner to exit gracefully and allow _run_action to finish execution. - fp = tempfile.NamedTemporaryFile() - temp_file = fp.name - self.assertIsNotNone(temp_file) - self.assertTrue(os.path.isfile(temp_file)) + with tempfile.NamedTemporaryFile() as fp: + temp_file = fp.name + self.assertIsNotNone(temp_file) + self.assertTrue(os.path.isfile(temp_file)) - # Launch the action execution in a separate thread. - params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} - liveaction_db = self._get_liveaction_model( - WorkerTestCase.local_action_db, params - ) - liveaction_db = LiveAction.add_or_update(liveaction_db) - executions.create_execution_object(liveaction_db) - runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) + # Launch the action execution in a separate thread. + params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file} + liveaction_db = self._get_liveaction_model( + WorkerTestCase.local_action_db, params + ) + liveaction_db = LiveAction.add_or_update(liveaction_db) + executions.create_execution_object(liveaction_db) + runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db) - # Wait for the worker up to 10s to add the liveaction to _running_liveactions. - for i in range(0, int(10 / 0.1)): - eventlet.sleep(0.1) - if len(action_worker._running_liveactions) > 0: - break + # Wait for the worker up to 10s to add the liveaction to _running_liveactions. + for i in range(0, int(10 / 0.1)): + eventlet.sleep(0.1) + if len(action_worker._running_liveactions) > 0: + break - self.assertEqual(len(action_worker._running_liveactions), 1) + self.assertEqual(len(action_worker._running_liveactions), 1) - # Shutdown the worker asynchronously. - shutdown_thread = eventlet.spawn(action_worker.shutdown) + # Shutdown the worker asynchronously. + shutdown_thread = eventlet.spawn(action_worker.shutdown) - fp.close() # Make sure the temporary file has been deleted. self.assertFalse(os.path.isfile(temp_file)) @@ -224,3 +223,4 @@ def test_worker_shutdown_with_abandon_wait_period(self): # _run_action but will not result in KeyError because the discard method is used to # to remove the liveaction from _running_liveactions. runner_thread.wait() + shutdown_thread.kill() From 9cc1881aace485780de1a157dd09ce17b20c9695 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Sep 2021 09:05:18 +0530 Subject: [PATCH 4/7] Fix CI error --- conf/st2.conf.sample | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 47e26e476b..50b9da9f68 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -8,6 +8,8 @@ emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list a enable = True [actionrunner] +# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown.Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). +abandon_wait_period = 0 # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 # location of the logging.conf file @@ -28,8 +30,6 @@ virtualenv_binary = /usr/bin/virtualenv virtualenv_opts = --system-site-packages # comma separated list allowed here. # Internal pool size for dispatcher used by workflow actions. workflows_pool_size = 40 -# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown. Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). -abandon_wait_period = [api] # List of origins allowed for api, auth and stream From 4b9b32b93e6334423744e3150a69168e9c8749dd Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Sep 2021 09:24:11 +0530 Subject: [PATCH 5/7] Remove trailing spaces from CHANGELOG --- CHANGELOG.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c35b83c140..d0f94a7578 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -37,8 +37,8 @@ Changed Contributed by @khushboobhatia01 -* Add new ``abandon_wait_period`` config option which will wait for grace period seconds after - which actionrunner starts abandoning incomplete executions during shutdown. +* Add new ``abandon_wait_period`` config option which will wait for grace period seconds after + which actionrunner starts abandoning incomplete executions during shutdown. Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). From d547b504b628d853cfc32b99a887ada955deee9b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Sep 2021 09:28:10 +0530 Subject: [PATCH 6/7] Remove trailing spaces from CHANGELOG --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d0f94a7578..54424c6f6c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -39,7 +39,7 @@ Changed * Add new ``abandon_wait_period`` config option which will wait for grace period seconds after which actionrunner starts abandoning incomplete executions during shutdown. - + Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). Set this config if you want short running executions to complete before actionrunner starts abandoning them. From ec38ad8fa33765eadc8955fc0fb3e1001fb8a519 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 7 Sep 2021 09:36:01 +0530 Subject: [PATCH 7/7] Remove trailing spaces from CHANGELOG --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 54424c6f6c..5744d5cd31 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -39,7 +39,7 @@ Changed * Add new ``abandon_wait_period`` config option which will wait for grace period seconds after which actionrunner starts abandoning incomplete executions during shutdown. - + Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0). Set this config if you want short running executions to complete before actionrunner starts abandoning them.