Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions contrib/runners/orquesta_runner/tests/unit/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from st2common.persistence import execution as ex_db_access
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.runners import utils as runners_utils
from st2common.services import action as ac_svc
from st2common.services import workflows as wf_svc
from st2common.transport import liveaction as lv_ac_xport
Expand Down Expand Up @@ -826,3 +827,69 @@ def test_fail_manually_with_recovery_failure(self):
]

self.assertListEqual(self.sort_workflow_errors(wf_ex_db.errors), expected_errors)

@mock.patch.object(
runners_utils,
'invoke_post_run',
mock.MagicMock(return_value=None))
def test_include_result_to_error_log(self):
username = 'stanley'
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
wf_input = {'who': 'Thanos'}
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input)
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)

# Assert action execution is running.
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
wf_ex_db = wf_ex_dbs[0]

# Assert task1 is already completed.
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
self.assertEqual(tk1_lv_ac_db.context.get('user'), username)
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)

# Manually override and fail the action execution and write some result.
# Action execution result can contain dotted notation so ensure this is tested.
result = {"127.0.0.1": {"hostname": "foobar"}}

ac_svc.update_status(
tk1_lv_ac_db,
ac_const.LIVEACTION_STATUS_FAILED,
result=result,
publish=False
)

tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(tk1_lv_ac_db.result, result)

# Manually handle action execution completion.
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)

# Assert task and workflow failed.
tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
self.assertEqual(tk1_ex_db.status, wf_statuses.FAILED)
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)

# Assert result is included in the error log.
expected_errors = [
{
'message': 'Execution failed. See result for details.',
'type': 'error',
'task_id': 'task1',
'result': {
'127.0.0.1': {
'hostname': 'foobar'
}
}
}
]

self.assertListEqual(wf_ex_db.errors, expected_errors)
2 changes: 1 addition & 1 deletion st2common/st2common/models/db/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class WorkflowExecutionDB(stormbase.StormFoundationDB, stormbase.ChangeRevisionF
state = stormbase.EscapedDictField()
status = me.StringField(required=True)
output = stormbase.EscapedDictField()
errors = me.DynamicField()
errors = stormbase.EscapedDynamicField()
start_timestamp = db_field_types.ComplexDateTimeField(default=date_utils.get_datetime_utc_now)
end_timestamp = db_field_types.ComplexDateTimeField()

Expand Down
71 changes: 32 additions & 39 deletions st2common/st2common/util/mongoescape.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,63 @@

from st2common.util.ujson import fast_deepcopy

# Note: Because of old rule escaping code, two different characters can be translated back to dot
RULE_CRITERIA_UNESCAPED = ['.']
RULE_CRITERIA_ESCAPED = [u'\u2024']
RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, RULE_CRITERIA_ESCAPED)))
RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED)))

# http://docs.mongodb.org/manual/faq/developers/#faq-dollar-sign-escaping
UNESCAPED = ['.', '$']
ESCAPED = [u'\uFF0E', u'\uFF04']
ESCAPE_TRANSLATION = dict(list(zip(UNESCAPED, ESCAPED)))
UNESCAPE_TRANSLATION = dict(list(zip(ESCAPED, UNESCAPED)))
UNESCAPE_TRANSLATION = dict(
list(zip(ESCAPED, UNESCAPED)) + list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED))
)

# Note: Because of old rule escaping code, two different characters can be translated back to dot
RULE_CRITERIA_UNESCAPED = ['.']
RULE_CRITERIA_ESCAPED = [u'\u2024']
RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED,
RULE_CRITERIA_ESCAPED)))
RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED,
RULE_CRITERIA_UNESCAPED)))

def _translate_chars(field, translation):
if isinstance(field, list):
return _translate_chars_in_list(field, translation)

def _prep_work_items(d):
return [(k, v, d) for k, v in six.iteritems(d)]
if isinstance(field, dict):
return _translate_chars_in_dict(field, translation)

return field

def _translate_chars(field, translation):
# Only translate the fields of a dict
if not isinstance(field, dict):
return field

work_items = _prep_work_items(field)
def _translate_chars_in_list(field, translation):
return [_translate_chars(value, translation) for value in field]

while len(work_items) > 0:
work_item = work_items.pop(0)
oldkey = work_item[0]
value = work_item[1]
work_field = work_item[2]
newkey = oldkey

for t_k, t_v in six.iteritems(translation):
if t_k in newkey:
newkey = newkey.replace(t_k, t_v)
def _translate_chars_in_key(key, translation):
for k, v in six.iteritems(translation):
if k in key:
key = key.replace(k, v)

if newkey != oldkey:
work_field[newkey] = value
del work_field[oldkey]
return key

if isinstance(value, dict):
work_items.extend(_prep_work_items(value))
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
work_items.extend(_prep_work_items(item))

return field
def _translate_chars_in_dict(field, translation):
return {
_translate_chars_in_key(k, translation): _translate_chars(v, translation)
for k, v in six.iteritems(field)
}


def escape_chars(field):
if not isinstance(field, dict):
if not isinstance(field, dict) and not isinstance(field, list):
return field

value = fast_deepcopy(field)

return _translate_chars(value, ESCAPE_TRANSLATION)


def unescape_chars(field):
if not isinstance(field, dict):
if not isinstance(field, dict) and not isinstance(field, list):
return field

value = fast_deepcopy(field)
translated = _translate_chars(value, UNESCAPE_TRANSLATION)
translated = _translate_chars(value, RULE_CRITERIA_UNESCAPE_TRANSLATION)
return translated

return _translate_chars(value, UNESCAPE_TRANSLATION)
21 changes: 21 additions & 0 deletions st2common/tests/unit/test_mongoescape.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,24 @@ def test_complex(self):

unescaped = mongoescape.unescape_chars(escaped)
self.assertDictEqual(field, unescaped)

def test_complex_list(self):
field = [
{'k1.k2': [{'l1.l2': '123'}, {'l3.l4': '456'}]},
{'k3': [{'l5.l6': '789'}]},
{'k4.k5': [1, 2, 3]},
{'k6': ['a', 'b']}
]

expected = [
{u'k1\uff0ek2': [{u'l1\uff0el2': '123'}, {u'l3\uff0el4': '456'}]},
{'k3': [{u'l5\uff0el6': '789'}]},
{u'k4\uff0ek5': [1, 2, 3]},
{'k6': ['a', 'b']}
]

escaped = mongoescape.escape_chars(field)
self.assertListEqual(expected, escaped)

unescaped = mongoescape.unescape_chars(escaped)
self.assertListEqual(field, unescaped)