From 6595200e26f11e8a32e7db2f1aa138041fd29707 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Tue, 21 Sep 2021 17:09:52 +0530 Subject: [PATCH 1/3] Implement proper workflow engine shutdown --- conf/st2.conf.sample | 2 ++ st2actions/st2actions/workflows/workflows.py | 12 ++++++++++++ st2common/st2common/config.py | 5 +++++ st2common/st2common/transport/consumers.py | 1 + 4 files changed, 20 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 62e1e00f6d..52e47b9423 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -345,6 +345,8 @@ webui_base_url = https://localhost gc_max_idle_sec = 0 # Location of the logging configuration file. logging = /etc/st2/logging.workflowengine.conf +# How much time to give to the request in progress to finish in seconds before killing workflow engine. +request_shutdown_time = 0 # Max jitter interval to smooth out retries. retry_max_jitter_msec = 1000 # Max time to stop retrying. diff --git a/st2actions/st2actions/workflows/workflows.py b/st2actions/st2actions/workflows/workflows.py index 2151c7d440..33c03f358c 100644 --- a/st2actions/st2actions/workflows/workflows.py +++ b/st2actions/st2actions/workflows/workflows.py @@ -16,6 +16,7 @@ from __future__ import absolute_import from orquesta import statuses +from oslo_config import cfg from st2common.constants import action as ac_const from st2common import log as logging @@ -29,6 +30,7 @@ from st2common.transport import consumers from st2common.transport import queues from st2common.transport import utils as txpt_utils +from st2common.util import concurrency LOG = logging.getLogger(__name__) @@ -86,6 +88,16 @@ def process(self, message): # the garbage collector will find and cancel these workflow executions. self.fail_workflow_execution(message, e) + def shutdown(self): + super(WorkflowExecutionHandler, self).shutdown() + engine_request_shutdown_time = cfg.CONF.workflow_engine.request_shutdown_time + if engine_request_shutdown_time: + LOG.info( + "Sleeping for %s seconds before engine shutdown...", + engine_request_shutdown_time, + ) + concurrency.sleep(engine_request_shutdown_time) + def fail_workflow_execution(self, message, exception): # Prepare attributes based on message type. if isinstance(message, wf_db_models.WorkflowExecutionDB): diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index cf2ada4ee3..eb569e88a3 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -743,6 +743,11 @@ def register_opts(ignore_errors=False): "orphaned and cancelled by the garbage collector. A value of zero means the " "feature is disabled. This is disabled by default.", ), + cfg.IntOpt( + "request_shutdown_time", + default=0, + help="How much time to give to the request in progress to finish in seconds before killing workflow engine.", + ), ] do_register_opts( diff --git a/st2common/st2common/transport/consumers.py b/st2common/st2common/transport/consumers.py index 47752f035f..69a3fc4acb 100644 --- a/st2common/st2common/transport/consumers.py +++ b/st2common/st2common/transport/consumers.py @@ -44,6 +44,7 @@ def __init__(self, connection, queues, handler): def shutdown(self): self._dispatcher.shutdown() + self.should_stop = True def get_consumers(self, Consumer, channel): consumer = Consumer( From 748960714e70adeae6efe368f6892b9ab9917d6f Mon Sep 17 00:00:00 2001 From: Khushboo Date: Wed, 22 Sep 2021 14:55:23 +0530 Subject: [PATCH 2/3] Add integration test --- conf/st2.tests.conf | 3 + .../tests/integration/test_workflow_engine.py | 85 +++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 st2actions/tests/integration/test_workflow_engine.py diff --git a/conf/st2.tests.conf b/conf/st2.tests.conf index 0e9afeb288..408f7d2efb 100644 --- a/conf/st2.tests.conf +++ b/conf/st2.tests.conf @@ -90,3 +90,6 @@ logging = st2actions/conf/logging.notifier.conf [exporter] logging = st2exporter/conf/logging.exporter.conf +[workflow_engine] +logging = st2actions/conf/syslog.workflowengine.conf +request_shutdown_time = 15 diff --git a/st2actions/tests/integration/test_workflow_engine.py b/st2actions/tests/integration/test_workflow_engine.py new file mode 100644 index 0000000000..6c4cff69c9 --- /dev/null +++ b/st2actions/tests/integration/test_workflow_engine.py @@ -0,0 +1,85 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import os +import sys +import signal + +import psutil + +from st2common.util import concurrency +from st2tests.base import IntegrationTestCase + +__all__ = ["WorkflowEngineTestCase"] + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +ST2_CONFIG_PATH = os.path.join(BASE_DIR, "../../../conf/st2.tests.conf") + +ST2_CONFIG_PATH = os.path.abspath(ST2_CONFIG_PATH) + +PYTHON_BINARY = sys.executable + +BINARY = os.path.join(BASE_DIR, "../../../st2actions/bin/st2workflowengine") +BINARY = os.path.abspath(BINARY) + +PACKS_BASE_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../../contrib")) + +DEFAULT_CMD = [PYTHON_BINARY, BINARY, "--config-file", ST2_CONFIG_PATH] + + +class WorkflowEngineTestCase(IntegrationTestCase): + @classmethod + def setUpClass(cls): + super(WorkflowEngineTestCase, cls).setUpClass() + + def test_shutdown(self): + process = self._start_workflow_engine_container() + + # Give it some time to start up + concurrency.sleep(7) + + # Assert process has started and is running + self.assertProcessIsRunning(process=process) + + pp = psutil.Process(process.pid) + + # Send SIGTERM + process.send_signal(signal.SIGTERM) + concurrency.sleep(1) + + # Assert process is still running after receiving SIGTREM signal. + self.assertProcessIsRunning(process) + + # Wait for rquest shutdown time. + concurrency.sleep(15) + + # Verify process has exited. + self.assertProcessExited(proc=pp) + self.remove_process(process=process) + + def _start_workflow_engine_container(self): + subprocess = concurrency.get_subprocess_module() + print("Using command: %s" % (" ".join(DEFAULT_CMD))) + process = subprocess.Popen( + DEFAULT_CMD, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, + preexec_fn=os.setsid, + ) + self.add_process(process=process) + return process From 8fa640d22066f288f359fd608560edc44d5f1979 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 23 Sep 2021 09:32:44 +0530 Subject: [PATCH 3/3] Retrigger CI