Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ Changed

Contributed by @khushboobhatia01

* Add new ``abandon_wait_period`` config option which will wait for grace period seconds after
which actionrunner starts abandoning incomplete executions during shutdown.

Defaults to 0 which means no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0).

Set this config if you want short running executions to complete before actionrunner starts abandoning them.

Contributed by @khushboobhatia01

3.5.0 - June 23, 2021
---------------------

Expand Down
2 changes: 2 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list a
enable = True

[actionrunner]
# Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown.Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0).
abandon_wait_period = 0
# Internal pool size for dispatcher used by regular actions.
actions_pool_size = 60
# location of the logging.conf file
Expand Down
11 changes: 11 additions & 0 deletions st2actions/st2actions/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import sys
import traceback

from oslo_config import cfg

from st2actions.container.base import RunnerContainer
from st2common import log as logging
from st2common.constants import action as action_constants
Expand All @@ -30,6 +32,7 @@
from st2common.transport.consumers import ActionsQueueConsumer
from st2common.transport import utils as transport_utils
from st2common.util import action_db as action_utils
from st2common.util import concurrency
from st2common.util import system_info
from st2common.transport import queues

Expand Down Expand Up @@ -134,6 +137,14 @@ def process(self, liveaction):

def shutdown(self):
super(ActionExecutionDispatcher, self).shutdown()
abandon_wait_period = cfg.CONF.actionrunner.abandon_wait_period
if abandon_wait_period:
LOG.debug(
"Sleeping for %s seconds before starting to abandon incomplete executions.",
abandon_wait_period,
)
concurrency.sleep(abandon_wait_period)

# Abandon running executions if incomplete
while self._running_liveactions:
liveaction_id = self._running_liveactions.pop()
Expand Down
60 changes: 60 additions & 0 deletions st2actions/tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,63 @@ def test_worker_shutdown(self):
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()

def test_worker_shutdown_with_abandon_wait_period(self):
cfg.CONF.set_override(
name="abandon_wait_period", override=30, group="actionrunner"
)
action_worker = actions_worker.get_worker()
temp_file = None

# Create a temporary file that is deleted when the file is closed and then set up an
# action to wait for this file to be deleted. This allows this test to run the action
# over a separate thread, run the shutdown sequence on the main thread, and then let
# the local runner to exit gracefully and allow _run_action to finish execution.
with tempfile.NamedTemporaryFile() as fp:
temp_file = fp.name
self.assertIsNotNone(temp_file)
self.assertTrue(os.path.isfile(temp_file))

# Launch the action execution in a separate thread.
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
liveaction_db = self._get_liveaction_model(
WorkerTestCase.local_action_db, params
)
liveaction_db = LiveAction.add_or_update(liveaction_db)
executions.create_execution_object(liveaction_db)
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)

# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) > 0:
break

self.assertEqual(len(action_worker._running_liveactions), 1)

# Shutdown the worker asynchronously.
shutdown_thread = eventlet.spawn(action_worker.shutdown)

# Make sure the temporary file has been deleted.
self.assertFalse(os.path.isfile(temp_file))

# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) < 1:
break
liveaction_db = LiveAction.get_by_id(liveaction_db.id)

# Verify that _running_liveactions is empty and the liveaction is succeeded.
self.assertEqual(len(action_worker._running_liveactions), 0)
self.assertEqual(
liveaction_db.status,
action_constants.LIVEACTION_STATUS_SUCCEEDED,
str(liveaction_db),
)

# Wait for the local runner to complete. This will activate the finally block in
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()
shutdown_thread.kill()
8 changes: 8 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,14 @@ def register_opts(ignore_errors=False):
"that size"
),
),
cfg.IntOpt(
"abandon_wait_period",
default=0,
help=(
"Wait period in seconds after which actionrunner starts abandoning incomplete executions during shutdown."
"Default value of 0 means that no wait is performed and executions are abandoned immediately (same as the default behavior in StackStorm <= 3.5.0)."
),
),
]

do_register_opts(
Expand Down