From de8a64c3be77273570b6f250bbefd11b064f4fe6 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 11 Nov 2021 16:45:53 +0530 Subject: [PATCH] Add system info to liveaction --- .../orquesta_runner/tests/unit/test_basic.py | 2 + st2actions/st2actions/workflows/workflows.py | 2 + st2common/st2common/services/workflows.py | 38 +++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/contrib/runners/orquesta_runner/tests/unit/test_basic.py b/contrib/runners/orquesta_runner/tests/unit/test_basic.py index 5f5c60a012..7f06e25626 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_basic.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_basic.py @@ -48,6 +48,7 @@ from st2common.transport import liveaction as lv_ac_xport from st2common.transport import workflow as wf_ex_xport from st2common.transport import publishers +from st2common.util import system_info from st2tests.mocks import liveaction as mock_lv_ac_xport from st2tests.mocks import workflow as mock_wf_ex_xport @@ -165,6 +166,7 @@ def test_run_workflow(self): expected_lv_ac_ctx = { "workflow_execution": str(wf_ex_db.id), "pack": "orquesta_tests", + "engine_info": system_info.get_process_info(), } self.assertDictEqual(lv_ac_db.context, expected_lv_ac_ctx) diff --git a/st2actions/st2actions/workflows/workflows.py b/st2actions/st2actions/workflows/workflows.py index 2151c7d440..a10427b907 100644 --- a/st2actions/st2actions/workflows/workflows.py +++ b/st2actions/st2actions/workflows/workflows.py @@ -125,11 +125,13 @@ def fail_workflow_execution(self, message, exception): wf_svc.update_progress(wf_ex_db, msg % (msg_type, wf_ex_id), severity="error") wf_svc.fail_workflow_execution(wf_ex_id, exception, task=task) + @wf_svc.add_system_info_to_action_context def handle_workflow_execution(self, wf_ex_db): # Request the next set of tasks to execute. wf_svc.update_progress(wf_ex_db, "Processing request for workflow execution.") wf_svc.request_next_tasks(wf_ex_db) + @wf_svc.add_system_info_to_action_context def handle_action_execution(self, ac_ex_db): # Exit if action execution is not executed under an orquesta workflow. if not wf_svc.is_action_execution_under_workflow_context(ac_ex_db): diff --git a/st2common/st2common/services/workflows.py b/st2common/st2common/services/workflows.py index 067583f303..c2b4ef288f 100644 --- a/st2common/st2common/services/workflows.py +++ b/st2common/st2common/services/workflows.py @@ -20,6 +20,8 @@ import retrying import six +from functools import wraps + from orquesta import conducting from orquesta import events from orquesta import exceptions as orquesta_exc @@ -49,6 +51,7 @@ from st2common.util import action_db as action_utils from st2common.util import date as date_utils from st2common.util import param as param_utils +from st2common.util import system_info LOG = logging.getLogger(__name__) @@ -1561,3 +1564,38 @@ def identify_orphaned_workflows(): continue return orphaned + + +def add_system_info_to_action_context(func): + @wraps(func) + def wrapper(*args, **kwargs): + engine_info = system_info.get_process_info() + + # Argument will be of type WorkflowExecutionDB/ActionExecutionDB only. + if kwargs.get("wf_ex_db"): + ac_ex = ex_db_access.ActionExecution.get_by_id( + kwargs["wf_ex_db"].action_execution + ) + lv_ex = lv_db_access.LiveAction.get_by_id(ac_ex.liveaction["id"]) + elif kwargs.get("ac_ex_db"): + lv_ex = lv_db_access.LiveAction.get_by_id( + kwargs["ac_ex_db"].liveaction["id"] + ) + else: + raise ValueError("Invalid message type") + + lv_ex.context.update({"engine_info": engine_info}) + lv_db_access.LiveAction.add_or_update(lv_ex, publish=False) + + try: + return func(*args, **kwargs) + except Exception as e: + raise e + finally: + lv_ex = lv_db_access.LiveAction.get_by_id(lv_ex.id) + try: + del lv_ex.context["engine_info"] + except KeyError: + pass + + return wrapper