diff --git a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py index d06d335993..436fe38fea 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_error_handling.py @@ -29,6 +29,7 @@ from tests.unit import base +from local_runner import local_shell_command_runner from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar from st2common.constants import action as ac_const @@ -59,6 +60,12 @@ st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core", ] +RUNNER_RESULT_FAILED = ( + ac_const.LIVEACTION_STATUS_FAILED, + {"127.0.0.1": {"hostname": "foobar"}}, + {}, +) + @mock.patch.object( publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None) @@ -954,6 +961,11 @@ def test_fail_manually_with_recovery_failure(self): @mock.patch.object( runners_utils, "invoke_post_run", mock.MagicMock(return_value=None) ) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]), + ) def test_include_result_to_error_log(self): username = "stanley" wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") @@ -981,24 +993,11 @@ def test_include_result_to_error_log(self): )[0] tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) self.assertEqual(tk1_lv_ac_db.context.get("user"), username) - self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) - # Manually override and fail the action execution and write some result. # Action execution result can contain dotted notation so ensure this is tested. result = {"127.0.0.1": {"hostname": "foobar"}} - ac_svc.update_status( - tk1_lv_ac_db, - ac_const.LIVEACTION_STATUS_FAILED, - result=result, - publish=False, - ) - - tk1_ac_ex_db = ex_db_access.ActionExecution.query( - task_execution=str(tk1_ex_db.id) - )[0] - tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) - self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) self.assertDictEqual(tk1_lv_ac_db.result, result) # Manually handle action execution completion. diff --git a/contrib/runners/orquesta_runner/tests/unit/test_policies.py b/contrib/runners/orquesta_runner/tests/unit/test_policies.py index 81ab639262..c8b836c53d 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_policies.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_policies.py @@ -29,6 +29,7 @@ from tests.unit import base import st2common +from local_runner import local_shell_command_runner from st2actions.notifier import notifier from st2actions.workflows import workflows from st2common.bootstrap import actionsregistrar @@ -57,6 +58,8 @@ st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core", ] +RUNNER_RESULT_FAILED = (ac_const.LIVEACTION_STATUS_FAILED, {"stderror": "..."}, {}) + @mock.patch.object( publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None) @@ -114,6 +117,11 @@ def tearDown(self): for ac_ex_db in ex_db_access.ActionExecution.get_all(): ac_ex_db.delete() + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]), + ) def test_retry_policy_applied_on_workflow_failure(self): wf_name = "sequential" wf_ac_ref = TEST_PACK + "." + wf_name @@ -136,16 +144,11 @@ def test_retry_policy_applied_on_workflow_failure(self): workflow_execution=str(wf_ex_db.id) )[0] t1_lv_ac_db = lv_db_access.LiveAction.query(task_execution=str(t1_ex_db.id))[0] + self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) t1_ac_ex_db = ex_db_access.ActionExecution.query( task_execution=str(t1_ex_db.id) )[0] - # Manually set the status to fail. - ac_svc.update_status(t1_lv_ac_db, ac_const.LIVEACTION_STATUS_FAILED) - t1_lv_ac_db = lv_db_access.LiveAction.query(task_execution=str(t1_ex_db.id))[0] - t1_ac_ex_db = ex_db_access.ActionExecution.query( - task_execution=str(t1_ex_db.id) - )[0] self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED) notifier.get_notifier().process(t1_ac_ex_db) workflows.get_engine().process(t1_ac_ex_db) @@ -158,6 +161,11 @@ def test_retry_policy_applied_on_workflow_failure(self): # Ensure execution is retried. self.assertEqual(len(lv_db_access.LiveAction.query(action=wf_ac_ref)), 2) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]), + ) def test_no_retry_policy_applied_on_task_failure(self): wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "subworkflow.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) @@ -197,7 +205,7 @@ def test_no_retry_policy_applied_on_task_failure(self): t1_t1_lv_ac_db = lv_db_access.LiveAction.query( task_execution=str(t1_t1_ex_db.id) )[0] - ac_svc.update_status(t1_t1_lv_ac_db, ac_const.LIVEACTION_STATUS_FAILED) + self.assertEqual(t1_t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED) t1_t1_ac_ex_db = ex_db_access.ActionExecution.query( task_execution=str(t1_t1_ex_db.id) )[0] diff --git a/contrib/runners/orquesta_runner/tests/unit/test_rerun.py b/contrib/runners/orquesta_runner/tests/unit/test_rerun.py index 191f3a0681..ca972f2d2d 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_rerun.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_rerun.py @@ -33,7 +33,6 @@ from st2common.persistence import liveaction as lv_db_access from st2common.persistence import workflow as wf_db_access from st2common.services import action as action_service -from st2common.services import executions as execution_service from st2common.services import workflows as workflow_service from st2common.transport import liveaction as lv_ac_xport from st2common.transport import workflow as wf_ex_xport @@ -52,7 +51,11 @@ st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core", ] -RUNNER_RESULT_FAILED = (action_constants.LIVEACTION_STATUS_FAILED, {}, {}) +RUNNER_RESULT_FAILED = ( + action_constants.LIVEACTION_STATUS_FAILED, + {"stderror": "..."}, + {}, +) RUNNER_RESULT_RUNNING = ( action_constants.LIVEACTION_STATUS_RUNNING, {"stdout": "..."}, @@ -216,10 +219,15 @@ def test_rerun_with_missing_workflow_execution_id(self): # Delete the workflow execution. wf_db_access.WorkflowExecution.delete(wf_ex_db, publish=False) - # Manually delete the workflow_execution_id from context of the action execution. + # Manually delete the workflow_execution_id from context of the liveaction. lv_ac_db1.context.pop("workflow_execution") lv_ac_db1 = lv_db_access.LiveAction.add_or_update(lv_ac_db1, publish=False) - ac_ex_db1 = execution_service.update_execution(lv_ac_db1, publish=False) + # Manually delete the workflow_execution_id from context of the action execution. + # We cannot use execution_service.update_execution here because by the time we reach + # execution_service.update_execution, action is already in completed state. + # Popping of workflow id and and updating the execution object will not work. + ac_ex_db1.context.pop("workflow_execution") + ac_ex_db1 = ex_db_access.ActionExecution.add_or_update(ac_ex_db1, publish=False) # Rerun the execution. context = {"re-run": {"ref": str(ac_ex_db1.id), "tasks": ["task1"]}} diff --git a/contrib/runners/orquesta_runner/tests/unit/test_with_items.py b/contrib/runners/orquesta_runner/tests/unit/test_with_items.py index 6676874586..6e16b7255d 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_with_items.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_with_items.py @@ -30,6 +30,7 @@ from tests.unit import base +from local_runner import local_shell_command_runner from st2actions.workflows import workflows from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar @@ -59,6 +60,18 @@ st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core", ] +RUNNER_RESULT_RUNNING = ( + action_constants.LIVEACTION_STATUS_RUNNING, + {"stdout": "..."}, + {}, +) + +RUNNER_RESULT_SUCCEEDED = ( + action_constants.LIVEACTION_STATUS_SUCCEEDED, + {"stdout": "..."}, + {}, +) + @mock.patch.object( publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None) @@ -315,6 +328,11 @@ def test_with_items_concurrency(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(return_value=RUNNER_RESULT_RUNNING), + ) def test_with_items_cancellation(self): num_items = 3 @@ -340,16 +358,6 @@ def test_with_items_cancellation(self): self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING) self.assertEqual(len(t1_ac_ex_dbs), num_items) - # Reset the action executions to running status. - for ac_ex in t1_ac_ex_dbs: - self.set_execution_status( - ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING - ) - - t1_ac_ex_dbs = ex_db_access.ActionExecution.query( - task_execution=str(t1_ex_db.id) - ) - status = [ ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING for ac_ex in t1_ac_ex_dbs @@ -389,6 +397,11 @@ def test_with_items_cancellation(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_CANCELED) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(return_value=RUNNER_RESULT_RUNNING), + ) def test_with_items_concurrency_cancellation(self): concurrency = 2 @@ -417,16 +430,6 @@ def test_with_items_concurrency_cancellation(self): self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING) self.assertEqual(len(t1_ac_ex_dbs), concurrency) - # Reset the action executions to running status. - for ac_ex in t1_ac_ex_dbs: - self.set_execution_status( - ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING - ) - - t1_ac_ex_dbs = ex_db_access.ActionExecution.query( - task_execution=str(t1_ex_db.id) - ) - status = [ ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING for ac_ex in t1_ac_ex_dbs @@ -466,6 +469,11 @@ def test_with_items_concurrency_cancellation(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_CANCELED) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock(return_value=RUNNER_RESULT_RUNNING), + ) def test_with_items_pause_and_resume(self): num_items = 3 @@ -491,16 +499,6 @@ def test_with_items_pause_and_resume(self): self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING) self.assertEqual(len(t1_ac_ex_dbs), num_items) - # Reset the action executions to running status. - for ac_ex in t1_ac_ex_dbs: - self.set_execution_status( - ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING - ) - - t1_ac_ex_dbs = ex_db_access.ActionExecution.query( - task_execution=str(t1_ex_db.id) - ) - status = [ ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING for ac_ex in t1_ac_ex_dbs @@ -551,6 +549,17 @@ def test_with_items_pause_and_resume(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED) + @mock.patch.object( + local_shell_command_runner.LocalShellCommandRunner, + "run", + mock.MagicMock( + side_effect=[ + RUNNER_RESULT_RUNNING, + RUNNER_RESULT_RUNNING, + RUNNER_RESULT_SUCCEEDED, + ] + ), + ) def test_with_items_concurrency_pause_and_resume(self): num_items = 3 concurrency = 2 @@ -580,16 +589,6 @@ def test_with_items_concurrency_pause_and_resume(self): self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING) self.assertEqual(len(t1_ac_ex_dbs), concurrency) - # Reset the action executions to running status. - for ac_ex in t1_ac_ex_dbs: - self.set_execution_status( - ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING - ) - - t1_ac_ex_dbs = ex_db_access.ActionExecution.query( - task_execution=str(t1_ex_db.id) - ) - status = [ ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING for ac_ex in t1_ac_ex_dbs diff --git a/st2common/st2common/services/executions.py b/st2common/st2common/services/executions.py index 9730381ae4..fe3b7bb591 100644 --- a/st2common/st2common/services/executions.py +++ b/st2common/st2common/services/executions.py @@ -194,6 +194,15 @@ def update_execution(liveaction_db, publish=True, set_result_size=False): on the "result_size" database field. """ execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) + + # Skip execution object update when action is already in completed state. + if execution.status in action_constants.LIVEACTION_COMPLETED_STATES: + LOG.debug( + "[%s] Action is already in completed state: %s. Skipping execution update to state: %s." + % (execution.id, execution.status, liveaction_db.status) + ) + return execution + decomposed = _decompose_liveaction(liveaction_db) kw = {} diff --git a/st2common/tests/unit/test_executions_util.py b/st2common/tests/unit/test_executions_util.py index f7702614d3..80c3e77083 100644 --- a/st2common/tests/unit/test_executions_util.py +++ b/st2common/tests/unit/test_executions_util.py @@ -171,6 +171,22 @@ def test_execution_update(self): self.assertGreater(execution.log[1]["timestamp"], pre_update_timestamp) self.assertLess(execution.log[1]["timestamp"], post_update_timestamp) + def test_skip_execution_update(self): + liveaction = self.MODELS["liveactions"]["successful_liveaction.yaml"] + executions_util.create_execution_object(liveaction) + pre_update_status = liveaction.status + liveaction.status = "running" + executions_util.update_execution(liveaction) + execution = self._get_action_execution( + liveaction__id=str(liveaction.id), raise_exception=True + ) + self.assertEqual(len(execution.log), 1) + # Check status is not updated if it's already in completed state. + self.assertEqual( + pre_update_status, action_constants.LIVEACTION_STATUS_SUCCEEDED + ) + self.assertEqual(execution.log[0]["status"], pre_update_status) + @mock.patch.object(PoolPublisher, "publish", mock.MagicMock()) @mock.patch.object( runners_utils, "invoke_post_run", mock.MagicMock(return_value=None)