From 5eb319ddf6079d029dd99f331b65d82e92647d1c Mon Sep 17 00:00:00 2001 From: W Chan Date: Sat, 21 Mar 2020 21:53:26 +0000 Subject: [PATCH 1/3] Rollback workflow DB change on errors field Rollback workflow DB change on errors field because there are cases where action execution result is included in the detail of the error log and the action execution result can contain dictionary with key that has dot notation such as IP address. Also identified and fix mongoescape bug on handling list of dicts in dynamic field and also multiple iteration on unescape. --- .../tests/unit/test_error_handling.py | 67 ++++++++++++++++++ st2common/st2common/models/db/workflow.py | 2 +- st2common/st2common/util/mongoescape.py | 70 ++++++++----------- 3 files changed, 99 insertions(+), 40 deletions(-) diff --git a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py index 31a17b593c1..361271b78ba 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py @@ -34,6 +34,7 @@ from st2common.persistence import execution as ex_db_access from st2common.persistence import liveaction as lv_db_access from st2common.persistence import workflow as wf_db_access +from st2common.runners import utils as runners_utils from st2common.services import action as ac_svc from st2common.services import workflows as wf_svc from st2common.transport import liveaction as lv_ac_xport @@ -826,3 +827,69 @@ def test_fail_manually_with_recovery_failure(self): ] self.assertListEqual(self.sort_workflow_errors(wf_ex_db.errors), expected_errors) + + @mock.patch.object( + runners_utils, + 'invoke_post_run', + mock.MagicMock(return_value=None)) + def test_include_result_to_error_log(self): + username = 'stanley' + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + wf_input = {'who': 'Thanos'} + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + + # Assert action execution is running. + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id)) + wf_ex_db = wf_ex_dbs[0] + + # Assert task1 is already completed. + query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'} + tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0] + tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0] + tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id']) + self.assertEqual(tk1_lv_ac_db.context.get('user'), username) + self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + + # Manually override and fail the action execution and write some result. + # Action execution result can contain dotted notation so ensure this is tested. + result = {"127.0.0.1": {"hostname": "foobar"}} + + ac_svc.update_status( + tk1_lv_ac_db, + ac_const.LIVEACTION_STATUS_FAILED, + result=result, + publish=False + ) + + tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0] + tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id']) + self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) + self.assertDictEqual(tk1_lv_ac_db.result, result) + + # Manually handle action execution completion. + wf_svc.handle_action_execution_completion(tk1_ac_ex_db) + + # Assert task and workflow failed. + tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id) + self.assertEqual(tk1_ex_db.status, wf_statuses.FAILED) + wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id) + self.assertEqual(wf_ex_db.status, wf_statuses.FAILED) + + # Assert result is included in the error log. + expected_errors = [ + { + 'message': 'Execution failed. See result for details.', + 'type': 'error', + 'task_id': 'task1', + 'result': { + '127.0.0.1': { + 'hostname': 'foobar' + } + } + } + ] + + self.assertListEqual(wf_ex_db.errors, expected_errors) diff --git a/st2common/st2common/models/db/workflow.py b/st2common/st2common/models/db/workflow.py index 7c0c51c1f2b..b642df8eb88 100644 --- a/st2common/st2common/models/db/workflow.py +++ b/st2common/st2common/models/db/workflow.py @@ -44,7 +44,7 @@ class WorkflowExecutionDB(stormbase.StormFoundationDB, stormbase.ChangeRevisionF state = stormbase.EscapedDictField() status = me.StringField(required=True) output = stormbase.EscapedDictField() - errors = me.DynamicField() + errors = stormbase.EscapedDynamicField() start_timestamp = db_field_types.ComplexDateTimeField(default=date_utils.get_datetime_utc_now) end_timestamp = db_field_types.ComplexDateTimeField() diff --git a/st2common/st2common/util/mongoescape.py b/st2common/st2common/util/mongoescape.py index a20db8f2a11..489bf374f3d 100644 --- a/st2common/st2common/util/mongoescape.py +++ b/st2common/st2common/util/mongoescape.py @@ -19,70 +19,62 @@ from st2common.util.ujson import fast_deepcopy +# Note: Because of old rule escaping code, two different characters can be translated back to dot +RULE_CRITERIA_UNESCAPED = ['.'] +RULE_CRITERIA_ESCAPED = [u'\u2024'] +RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, RULE_CRITERIA_ESCAPED))) +RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED))) + # http://docs.mongodb.org/manual/faq/developers/#faq-dollar-sign-escaping UNESCAPED = ['.', '$'] ESCAPED = [u'\uFF0E', u'\uFF04'] ESCAPE_TRANSLATION = dict(list(zip(UNESCAPED, ESCAPED))) -UNESCAPE_TRANSLATION = dict(list(zip(ESCAPED, UNESCAPED))) +UNESCAPE_TRANSLATION = dict( + list(zip(ESCAPED, UNESCAPED)) + list(zip(RULE_CRITERIA_ESCAPED, RULE_CRITERIA_UNESCAPED)) +) -# Note: Because of old rule escaping code, two different characters can be translated back to dot -RULE_CRITERIA_UNESCAPED = ['.'] -RULE_CRITERIA_ESCAPED = [u'\u2024'] -RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED, - RULE_CRITERIA_ESCAPED))) -RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED, - RULE_CRITERIA_UNESCAPED))) +def _translate_chars(field, translation): + if isinstance(field, list): + return _translate_chars_in_list(field, translation) -def _prep_work_items(d): - return [(k, v, d) for k, v in six.iteritems(d)] + if isinstance(field, dict): + return _translate_chars_in_dict(field, translation) + return field -def _translate_chars(field, translation): - # Only translate the fields of a dict - if not isinstance(field, dict): - return field - work_items = _prep_work_items(field) +def _translate_chars_in_list(field, translation): + return [_translate_chars(value, translation) for value in field] - while len(work_items) > 0: - work_item = work_items.pop(0) - oldkey = work_item[0] - value = work_item[1] - work_field = work_item[2] - newkey = oldkey - for t_k, t_v in six.iteritems(translation): - if t_k in newkey: - newkey = newkey.replace(t_k, t_v) +def _translate_chars_in_key(key, translation): + for k, v in [(k, v) for k, v in six.iteritems(translation) if k in key]: + key = key.replace(k, v) - if newkey != oldkey: - work_field[newkey] = value - del work_field[oldkey] + return key - if isinstance(value, dict): - work_items.extend(_prep_work_items(value)) - elif isinstance(value, list): - for item in value: - if isinstance(item, dict): - work_items.extend(_prep_work_items(item)) - return field +def _translate_chars_in_dict(field, translation): + return { + _translate_chars_in_key(k, translation): _translate_chars(v, translation) + for k, v in six.iteritems(field) + } def escape_chars(field): - if not isinstance(field, dict): + if not isinstance(field, dict) and not isinstance(field, list): return field value = fast_deepcopy(field) + return _translate_chars(value, ESCAPE_TRANSLATION) def unescape_chars(field): - if not isinstance(field, dict): + if not isinstance(field, dict) and not isinstance(field, list): return field value = fast_deepcopy(field) - translated = _translate_chars(value, UNESCAPE_TRANSLATION) - translated = _translate_chars(value, RULE_CRITERIA_UNESCAPE_TRANSLATION) - return translated + + return _translate_chars(value, UNESCAPE_TRANSLATION) From af0794c7f9b28e8ffd22e18abff882325cf7ff5f Mon Sep 17 00:00:00 2001 From: blag Date: Mon, 23 Mar 2020 19:20:13 -0700 Subject: [PATCH 2/3] Implement _translate_chars with singledispatch --- st2common/st2common/util/mongoescape.py | 28 +++++++++---------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/st2common/st2common/util/mongoescape.py b/st2common/st2common/util/mongoescape.py index 489bf374f3d..16012485a37 100644 --- a/st2common/st2common/util/mongoescape.py +++ b/st2common/st2common/util/mongoescape.py @@ -14,6 +14,11 @@ from __future__ import absolute_import +try: # Python 3 + from functools import singledispatch +except ImportError: # Python 2 + from singledispatch import singledispatch + import six from six.moves import zip @@ -34,16 +39,12 @@ ) +@singledispatch def _translate_chars(field, translation): - if isinstance(field, list): - return _translate_chars_in_list(field, translation) - - if isinstance(field, dict): - return _translate_chars_in_dict(field, translation) - return field +@_translate_chars.register(list) def _translate_chars_in_list(field, translation): return [_translate_chars(value, translation) for value in field] @@ -55,6 +56,7 @@ def _translate_chars_in_key(key, translation): return key +@_translate_chars.register(dict) def _translate_chars_in_dict(field, translation): return { _translate_chars_in_key(k, translation): _translate_chars(v, translation) @@ -63,18 +65,8 @@ def _translate_chars_in_dict(field, translation): def escape_chars(field): - if not isinstance(field, dict) and not isinstance(field, list): - return field - - value = fast_deepcopy(field) - - return _translate_chars(value, ESCAPE_TRANSLATION) + return _translate_chars(fast_deepcopy(field), ESCAPE_TRANSLATION) def unescape_chars(field): - if not isinstance(field, dict) and not isinstance(field, list): - return field - - value = fast_deepcopy(field) - - return _translate_chars(value, UNESCAPE_TRANSLATION) + return _translate_chars(fast_deepcopy(field), UNESCAPE_TRANSLATION) From ccf835658c7b15b8e1d92acdbaa606641acbcc7c Mon Sep 17 00:00:00 2001 From: blag Date: Mon, 23 Mar 2020 20:56:53 -0700 Subject: [PATCH 3/3] Use the singledispatch package for Python < 3.4 --- fixed-requirements.txt | 1 + requirements.txt | 1 + st2common/in-requirements.txt | 1 + st2common/requirements.txt | 1 + 4 files changed, 4 insertions(+) diff --git a/fixed-requirements.txt b/fixed-requirements.txt index af2c1baf051..730f5af8ce0 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -35,6 +35,7 @@ cryptography==2.8 retrying==1.3.3 # Note: We use latest version of virtualenv which uses pip 19 virtualenv==16.6.0 +singledispatch==3.4.0.3 # NOTE: sseclient has various issues which sometimes hang the connection for a long time, etc. sseclient-py==1.7 python-editor==1.0.4 diff --git a/requirements.txt b/requirements.txt index 3898401f16c..dc9723f37bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,6 +56,7 @@ requests[security]==2.23.0 retrying==1.3.3 routes==2.4.1 semver==2.9.0 +singledispatch==3.4.0.3 six==1.13.0 sseclient-py==1.7 stevedore==1.30.1 diff --git a/st2common/in-requirements.txt b/st2common/in-requirements.txt index 2871a4d4a77..f7af82a73e7 100644 --- a/st2common/in-requirements.txt +++ b/st2common/in-requirements.txt @@ -33,6 +33,7 @@ ujson # Note: amqp is used by kombu, this needs to be added here to be picked up by # requirements fixate script. amqp +singledispatch # Used by st2-pack-* commands gitpython lockfile diff --git a/st2common/requirements.txt b/st2common/requirements.txt index 73ee717aeb8..fce1db1ef05 100644 --- a/st2common/requirements.txt +++ b/st2common/requirements.txt @@ -31,6 +31,7 @@ requests[security]==2.23.0 retrying==1.3.3 routes==2.4.1 semver==2.9.0 +singledispatch==3.4.0.3 six==1.13.0 tooz==1.66.1 ujson==1.35