Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ webui_base_url = https://localhost
gc_max_idle_sec = 0
# Location of the logging configuration file.
logging = /etc/st2/logging.workflowengine.conf
# How much time to give to the request in progress to finish in seconds before killing workflow engine.
request_shutdown_time = 0
# Max jitter interval to smooth out retries.
retry_max_jitter_msec = 1000
# Max time to stop retrying.
Expand Down
3 changes: 3 additions & 0 deletions conf/st2.tests.conf
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ logging = st2actions/conf/logging.notifier.conf
[exporter]
logging = st2exporter/conf/logging.exporter.conf

[workflow_engine]
logging = st2actions/conf/syslog.workflowengine.conf
request_shutdown_time = 15
12 changes: 12 additions & 0 deletions st2actions/st2actions/workflows/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import absolute_import

from orquesta import statuses
from oslo_config import cfg

from st2common.constants import action as ac_const
from st2common import log as logging
Expand All @@ -29,6 +30,7 @@
from st2common.transport import consumers
from st2common.transport import queues
from st2common.transport import utils as txpt_utils
from st2common.util import concurrency


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,6 +88,16 @@ def process(self, message):
# the garbage collector will find and cancel these workflow executions.
self.fail_workflow_execution(message, e)

def shutdown(self):
super(WorkflowExecutionHandler, self).shutdown()
engine_request_shutdown_time = cfg.CONF.workflow_engine.request_shutdown_time
if engine_request_shutdown_time:
LOG.info(
"Sleeping for %s seconds before engine shutdown...",
engine_request_shutdown_time,
)
concurrency.sleep(engine_request_shutdown_time)

def fail_workflow_execution(self, message, exception):
# Prepare attributes based on message type.
if isinstance(message, wf_db_models.WorkflowExecutionDB):
Expand Down
85 changes: 85 additions & 0 deletions st2actions/tests/integration/test_workflow_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2021 The StackStorm Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import os
import sys
import signal

import psutil

from st2common.util import concurrency
from st2tests.base import IntegrationTestCase

__all__ = ["WorkflowEngineTestCase"]

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

ST2_CONFIG_PATH = os.path.join(BASE_DIR, "../../../conf/st2.tests.conf")

ST2_CONFIG_PATH = os.path.abspath(ST2_CONFIG_PATH)

PYTHON_BINARY = sys.executable

BINARY = os.path.join(BASE_DIR, "../../../st2actions/bin/st2workflowengine")
BINARY = os.path.abspath(BINARY)

PACKS_BASE_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../../contrib"))

DEFAULT_CMD = [PYTHON_BINARY, BINARY, "--config-file", ST2_CONFIG_PATH]


class WorkflowEngineTestCase(IntegrationTestCase):
@classmethod
def setUpClass(cls):
super(WorkflowEngineTestCase, cls).setUpClass()

def test_shutdown(self):
process = self._start_workflow_engine_container()

# Give it some time to start up
concurrency.sleep(7)

# Assert process has started and is running
self.assertProcessIsRunning(process=process)

pp = psutil.Process(process.pid)

# Send SIGTERM
process.send_signal(signal.SIGTERM)
concurrency.sleep(1)

# Assert process is still running after receiving SIGTREM signal.
self.assertProcessIsRunning(process)

# Wait for rquest shutdown time.
concurrency.sleep(15)

# Verify process has exited.
self.assertProcessExited(proc=pp)
self.remove_process(process=process)

def _start_workflow_engine_container(self):
subprocess = concurrency.get_subprocess_module()
print("Using command: %s" % (" ".join(DEFAULT_CMD)))
process = subprocess.Popen(
DEFAULT_CMD,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
preexec_fn=os.setsid,
)
self.add_process(process=process)
return process
5 changes: 5 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,11 @@ def register_opts(ignore_errors=False):
"orphaned and cancelled by the garbage collector. A value of zero means the "
"feature is disabled. This is disabled by default.",
),
cfg.IntOpt(
"request_shutdown_time",
default=0,
help="How much time to give to the request in progress to finish in seconds before killing workflow engine.",
),
]

do_register_opts(
Expand Down
1 change: 1 addition & 0 deletions st2common/st2common/transport/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, connection, queues, handler):

def shutdown(self):
self._dispatcher.shutdown()
self.should_stop = True

def get_consumers(self, Consumer, channel):
consumer = Consumer(
Expand Down