Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions contrib/runners/orquesta_runner/tests/unit/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from st2common.persistence import execution as ex_db_access
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.runners import utils as runners_utils
from st2common.services import action as ac_svc
from st2common.services import workflows as wf_svc
from st2common.transport import liveaction as lv_ac_xport
Expand Down Expand Up @@ -826,3 +827,69 @@ def test_fail_manually_with_recovery_failure(self):
]

self.assertListEqual(self.sort_workflow_errors(wf_ex_db.errors), expected_errors)

@mock.patch.object(
runners_utils,
'invoke_post_run',
mock.MagicMock(return_value=None))
def test_include_result_to_error_log(self):
username = 'stanley'
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
wf_input = {'who': 'Thanos'}
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input)
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)

# Assert action execution is running.
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
wf_ex_db = wf_ex_dbs[0]

# Assert task1 is already completed.
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
self.assertEqual(tk1_lv_ac_db.context.get('user'), username)
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)

# Manually override and fail the action execution and write some result.
# Action execution result can contain dotted notation so ensure this is tested.
result = {"127.0.0.1": {"hostname": "foobar"}}

ac_svc.update_status(
tk1_lv_ac_db,
ac_const.LIVEACTION_STATUS_FAILED,
result=result,
publish=False
)

tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(tk1_lv_ac_db.result, result)

# Manually handle action execution completion.
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)

# Assert task and workflow failed.
tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
self.assertEqual(tk1_ex_db.status, wf_statuses.FAILED)
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)

# Assert result is included in the error log.
expected_errors = [
{
'message': 'Execution failed. See result for details.',
'type': 'error',
'task_id': 'task1',
'result': {
'127.0.0.1': {
'hostname': 'foobar'
}
}
}
]

self.assertListEqual(wf_ex_db.errors, expected_errors)
1 change: 1 addition & 0 deletions fixed-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ cryptography==2.8
retrying==1.3.3
# Note: We use latest version of virtualenv which uses pip 19
virtualenv==16.6.0
singledispatch==3.4.0.3
# NOTE: sseclient has various issues which sometimes hang the connection for a long time, etc.
sseclient-py==1.7
python-editor==1.0.4
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ requests[security]==2.23.0
retrying==1.3.3
routes==2.4.1
semver==2.9.0
singledispatch==3.4.0.3
six==1.13.0
sseclient-py==1.7
stevedore==1.30.1
Expand Down
1 change: 1 addition & 0 deletions st2common/in-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ujson
# Note: amqp is used by kombu, this needs to be added here to be picked up by
# requirements fixate script.
amqp
singledispatch
# Used by st2-pack-* commands
gitpython
lockfile
1 change: 1 addition & 0 deletions st2common/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ requests[security]==2.23.0
retrying==1.3.3
routes==2.4.1
semver==2.9.0
singledispatch==3.4.0.3
six==1.13.0
tooz==1.66.1
ujson==1.35
Expand Down
2 changes: 1 addition & 1 deletion st2common/st2common/models/db/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class WorkflowExecutionDB(stormbase.StormFoundationDB, stormbase.ChangeRevisionF
state = stormbase.EscapedDictField()
status = me.StringField(required=True)
output = stormbase.EscapedDictField()
errors = me.DynamicField()
errors = stormbase.EscapedDynamicField()
start_timestamp = db_field_types.ComplexDateTimeField(default=date_utils.get_datetime_utc_now)
end_timestamp = db_field_types.ComplexDateTimeField()

Expand Down
86 changes: 35 additions & 51 deletions st2common/st2common/util/mongoescape.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,59 @@

from __future__ import absolute_import

try: # Python 3
from functools import singledispatch
except ImportError: # Python 2
from singledispatch import singledispatch

import six
from six.moves import zip

from st2common.util.ujson import fast_deepcopy

# Note: Because of old rule escaping code, two different characters can be translated back to dot
RULE_CRITERIA_UNESCAPED = ['.']
RULE_CRITERIA_ESCAPED = [u'\u2024']
RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, RULE_CRITERIA_ESCAPED)))
RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED)))

# http://docs.mongodb.org/manual/faq/developers/#faq-dollar-sign-escaping
UNESCAPED = ['.', '$']
ESCAPED = [u'\uFF0E', u'\uFF04']
ESCAPE_TRANSLATION = dict(list(zip(UNESCAPED, ESCAPED)))
UNESCAPE_TRANSLATION = dict(list(zip(ESCAPED, UNESCAPED)))
UNESCAPE_TRANSLATION = dict(
list(zip(ESCAPED, UNESCAPED)) + list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED))
)

# Note: Because of old rule escaping code, two different characters can be translated back to dot
RULE_CRITERIA_UNESCAPED = ['.']
RULE_CRITERIA_ESCAPED = [u'\u2024']
RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED,
RULE_CRITERIA_ESCAPED)))
RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED,
RULE_CRITERIA_UNESCAPED)))

@singledispatch
def _translate_chars(field, translation):
return field

def _prep_work_items(d):
return [(k, v, d) for k, v in six.iteritems(d)]

@_translate_chars.register(list)
def _translate_chars_in_list(field, translation):
return [_translate_chars(value, translation) for value in field]

def _translate_chars(field, translation):
# Only translate the fields of a dict
if not isinstance(field, dict):
return field

work_items = _prep_work_items(field)

while len(work_items) > 0:
work_item = work_items.pop(0)
oldkey = work_item[0]
value = work_item[1]
work_field = work_item[2]
newkey = oldkey

for t_k, t_v in six.iteritems(translation):
if t_k in newkey:
newkey = newkey.replace(t_k, t_v)

if newkey != oldkey:
work_field[newkey] = value
del work_field[oldkey]

if isinstance(value, dict):
work_items.extend(_prep_work_items(value))
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
work_items.extend(_prep_work_items(item))

return field
def _translate_chars_in_key(key, translation):
for k, v in [(k, v) for k, v in six.iteritems(translation) if k in key]:
key = key.replace(k, v)

return key

def escape_chars(field):
if not isinstance(field, dict):
return field

value = fast_deepcopy(field)
return _translate_chars(value, ESCAPE_TRANSLATION)
@_translate_chars.register(dict)
def _translate_chars_in_dict(field, translation):
return {
_translate_chars_in_key(k, translation): _translate_chars(v, translation)
for k, v in six.iteritems(field)
}


def unescape_chars(field):
if not isinstance(field, dict):
return field
def escape_chars(field):
return _translate_chars(fast_deepcopy(field), ESCAPE_TRANSLATION)

value = fast_deepcopy(field)
translated = _translate_chars(value, UNESCAPE_TRANSLATION)
translated = _translate_chars(value, RULE_CRITERIA_UNESCAPE_TRANSLATION)
return translated

def unescape_chars(field):
return _translate_chars(fast_deepcopy(field), UNESCAPE_TRANSLATION)