diff --git a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py index 31a17b593c..361271b78b 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py @@ -34,6 +34,7 @@ from st2common.persistence import execution as ex_db_access from st2common.persistence import liveaction as lv_db_access from st2common.persistence import workflow as wf_db_access +from st2common.runners import utils as runners_utils from st2common.services import action as ac_svc from st2common.services import workflows as wf_svc from st2common.transport import liveaction as lv_ac_xport @@ -826,3 +827,69 @@ def test_fail_manually_with_recovery_failure(self): ] self.assertListEqual(self.sort_workflow_errors(wf_ex_db.errors), expected_errors) + + @mock.patch.object( + runners_utils, + 'invoke_post_run', + mock.MagicMock(return_value=None)) + def test_include_result_to_error_log(self): + username = 'stanley' + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + wf_input = {'who': 'Thanos'} + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + + # Assert action execution is running. + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id)) + wf_ex_db = wf_ex_dbs[0] + + # Assert task1 is already completed. + query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'} + tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0] + tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0] + tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id']) + self.assertEqual(tk1_lv_ac_db.context.get('user'), username) + self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + + # Manually override and fail the action execution and write some result. + # Action execution result can contain dotted notation so ensure this is tested. + result = {"127.0.0.1": {"hostname": "foobar"}} + + ac_svc.update_status( + tk1_lv_ac_db, + ac_const.LIVEACTION_STATUS_FAILED, + result=result, + publish=False + ) + + tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0] + tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id']) + self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) + self.assertDictEqual(tk1_lv_ac_db.result, result) + + # Manually handle action execution completion. + wf_svc.handle_action_execution_completion(tk1_ac_ex_db) + + # Assert task and workflow failed. + tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id) + self.assertEqual(tk1_ex_db.status, wf_statuses.FAILED) + wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id) + self.assertEqual(wf_ex_db.status, wf_statuses.FAILED) + + # Assert result is included in the error log. + expected_errors = [ + { + 'message': 'Execution failed. See result for details.', + 'type': 'error', + 'task_id': 'task1', + 'result': { + '127.0.0.1': { + 'hostname': 'foobar' + } + } + } + ] + + self.assertListEqual(wf_ex_db.errors, expected_errors) diff --git a/fixed-requirements.txt b/fixed-requirements.txt index af2c1baf05..730f5af8ce 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -35,6 +35,7 @@ cryptography==2.8 retrying==1.3.3 # Note: We use latest version of virtualenv which uses pip 19 virtualenv==16.6.0 +singledispatch==3.4.0.3 # NOTE: sseclient has various issues which sometimes hang the connection for a long time, etc. sseclient-py==1.7 python-editor==1.0.4 diff --git a/requirements.txt b/requirements.txt index 3898401f16..dc9723f37b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,6 +56,7 @@ requests[security]==2.23.0 retrying==1.3.3 routes==2.4.1 semver==2.9.0 +singledispatch==3.4.0.3 six==1.13.0 sseclient-py==1.7 stevedore==1.30.1 diff --git a/st2common/in-requirements.txt b/st2common/in-requirements.txt index 2871a4d4a7..f7af82a73e 100644 --- a/st2common/in-requirements.txt +++ b/st2common/in-requirements.txt @@ -33,6 +33,7 @@ ujson # Note: amqp is used by kombu, this needs to be added here to be picked up by # requirements fixate script. amqp +singledispatch # Used by st2-pack-* commands gitpython lockfile diff --git a/st2common/requirements.txt b/st2common/requirements.txt index 73ee717aeb..fce1db1ef0 100644 --- a/st2common/requirements.txt +++ b/st2common/requirements.txt @@ -31,6 +31,7 @@ requests[security]==2.23.0 retrying==1.3.3 routes==2.4.1 semver==2.9.0 +singledispatch==3.4.0.3 six==1.13.0 tooz==1.66.1 ujson==1.35 diff --git a/st2common/st2common/models/db/workflow.py b/st2common/st2common/models/db/workflow.py index 7c0c51c1f2..b642df8eb8 100644 --- a/st2common/st2common/models/db/workflow.py +++ b/st2common/st2common/models/db/workflow.py @@ -44,7 +44,7 @@ class WorkflowExecutionDB(stormbase.StormFoundationDB, stormbase.ChangeRevisionF state = stormbase.EscapedDictField() status = me.StringField(required=True) output = stormbase.EscapedDictField() - errors = me.DynamicField() + errors = stormbase.EscapedDynamicField() start_timestamp = db_field_types.ComplexDateTimeField(default=date_utils.get_datetime_utc_now) end_timestamp = db_field_types.ComplexDateTimeField() diff --git a/st2common/st2common/util/mongoescape.py b/st2common/st2common/util/mongoescape.py index a20db8f2a1..16012485a3 100644 --- a/st2common/st2common/util/mongoescape.py +++ b/st2common/st2common/util/mongoescape.py @@ -14,75 +14,59 @@ from __future__ import absolute_import +try: # Python 3 + from functools import singledispatch +except ImportError: # Python 2 + from singledispatch import singledispatch + import six from six.moves import zip from st2common.util.ujson import fast_deepcopy +# Note: Because of old rule escaping code, two different characters can be translated back to dot +RULE_CRITERIA_UNESCAPED = ['.'] +RULE_CRITERIA_ESCAPED = [u'\u2024'] +RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, RULE_CRITERIA_ESCAPED))) +RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED))) + # http://docs.mongodb.org/manual/faq/developers/#faq-dollar-sign-escaping UNESCAPED = ['.', '$'] ESCAPED = [u'\uFF0E', u'\uFF04'] ESCAPE_TRANSLATION = dict(list(zip(UNESCAPED, ESCAPED))) -UNESCAPE_TRANSLATION = dict(list(zip(ESCAPED, UNESCAPED))) +UNESCAPE_TRANSLATION = dict( + list(zip(ESCAPED, UNESCAPED)) + list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED)) +) -# Note: Because of old rule escaping code, two different characters can be translated back to dot -RULE_CRITERIA_UNESCAPED = ['.'] -RULE_CRITERIA_ESCAPED = [u'\u2024'] -RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, - RULE_CRITERIA_ESCAPED))) -RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, - RULE_CRITERIA_UNESCAPED))) +@singledispatch +def _translate_chars(field, translation): + return field -def _prep_work_items(d): - return [(k, v, d) for k, v in six.iteritems(d)] +@_translate_chars.register(list) +def _translate_chars_in_list(field, translation): + return [_translate_chars(value, translation) for value in field] -def _translate_chars(field, translation): - # Only translate the fields of a dict - if not isinstance(field, dict): - return field - - work_items = _prep_work_items(field) - - while len(work_items) > 0: - work_item = work_items.pop(0) - oldkey = work_item[0] - value = work_item[1] - work_field = work_item[2] - newkey = oldkey - - for t_k, t_v in six.iteritems(translation): - if t_k in newkey: - newkey = newkey.replace(t_k, t_v) - - if newkey != oldkey: - work_field[newkey] = value - del work_field[oldkey] - - if isinstance(value, dict): - work_items.extend(_prep_work_items(value)) - elif isinstance(value, list): - for item in value: - if isinstance(item, dict): - work_items.extend(_prep_work_items(item)) - return field +def _translate_chars_in_key(key, translation): + for k, v in [(k, v) for k, v in six.iteritems(translation) if k in key]: + key = key.replace(k, v) + return key -def escape_chars(field): - if not isinstance(field, dict): - return field - value = fast_deepcopy(field) - return _translate_chars(value, ESCAPE_TRANSLATION) +@_translate_chars.register(dict) +def _translate_chars_in_dict(field, translation): + return { + _translate_chars_in_key(k, translation): _translate_chars(v, translation) + for k, v in six.iteritems(field) + } -def unescape_chars(field): - if not isinstance(field, dict): - return field +def escape_chars(field): + return _translate_chars(fast_deepcopy(field), ESCAPE_TRANSLATION) - value = fast_deepcopy(field) - translated = _translate_chars(value, UNESCAPE_TRANSLATION) - translated = _translate_chars(value, RULE_CRITERIA_UNESCAPE_TRANSLATION) - return translated + +def unescape_chars(field): + return _translate_chars(fast_deepcopy(field), UNESCAPE_TRANSLATION)