diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5337eb1ea3..b37da70ea5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,12 @@ Changelog in development -------------- +Fixed +~~~~~ + +* Fix the workflow execution cancelation to proceed even if the workflow execution is not found or + completed. (bug fix) #4735 + 3.1.0 - June 27, 2019 --------------------- diff --git a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py index 1372dacac1..47f77bb0ee 100644 --- a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py +++ b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py @@ -14,6 +14,8 @@ from __future__ import absolute_import +import sys +import traceback import uuid import six @@ -24,6 +26,7 @@ from st2common.constants import action as ac_const from st2common import log as logging +from st2common.exceptions import workflow as wf_svc_exc from st2common.models.api import notification as notify_api_models from st2common.persistence import execution as ex_db_access from st2common.persistence import liveaction as lv_db_access @@ -198,8 +201,29 @@ def task_cancelable(ac_ex): return wf_ex_cancelable or ac_ex_cancelable def cancel(self): - # Cancel the target workflow. - wf_svc.request_cancellation(self.execution) + result = None + + # Try to cancel the target workflow execution. + try: + wf_svc.request_cancellation(self.execution) + # If workflow execution is not found because the action execution is cancelled + # before the workflow execution is created or if the workflow execution is + # already completed, then ignore the exception and proceed with cancellation. + except (wf_svc_exc.WorkflowExecutionNotFoundException, + wf_svc_exc.WorkflowExecutionIsCompletedException): + pass + # If there is an unknown exception, then log the error. Continue with the + # cancelation sequence below to cancel children and determine final status. + # If we rethrow the exception here, the workflow will be stuck in a canceling + # state with no options for user to clean up. It is safer to continue with + # the cancel then to revert back to some other statuses because the workflow + # execution will be in an unknown state. + except Exception: + _, ex, tb = sys.exc_info() + msg = 'Error encountered when canceling workflow execution. %s' + result = {'error': msg % str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))} + msg = '[%s] Error encountered when canceling workflow execution.' + LOG.exception(msg, str(self.execution.id)) # Request cancellation of tasks that are workflows and still running. for child_ex_id in self.execution.children: @@ -218,7 +242,7 @@ def cancel(self): return ( status, - self.liveaction.result, + result if result else self.liveaction.result, self.liveaction.context ) diff --git a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py index 2e4d6a5c49..58383fb9bf 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py @@ -18,6 +18,7 @@ import st2tests +from orquesta import statuses as wf_ex_statuses from oslo_config import cfg # XXX: actionsensor import depends on config being setup. @@ -235,3 +236,64 @@ def test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows(self) # Assert the main workflow is canceling. lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + def test_cancel_before_wf_ex_db_created(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Delete the workfow execution to mock issue where the record has not been created yet. + wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0] + wf_db_access.WorkflowExecution.delete(wf_ex_db, publish=False, dispatch_trigger=False) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + def test_cancel_after_wf_ex_db_completed(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Delete the workfow execution to mock issue where the workflow is already completed + # but the liveaction and action execution have not had time to be updated. + wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0] + wf_ex_db.status = wf_ex_statuses.SUCCEEDED + wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + @mock.patch.object( + wf_svc, 'request_cancellation', + mock.MagicMock(side_effect=Exception('foobar'))) + def test_cancel_unexpected_exception(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + + # Make sure request cancellation is called. + self.assertTrue(wf_svc.request_cancellation.called) + + # Make sure the live action and action execution still has a canceled + # status despite of cancelation failure. The other option would be + # to raise an exception and the records will be stuck in a canceling + # status and user is unable to easily clean up. + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + self.assertIn('Error encountered when canceling', lv_ac_db.result.get('error', ''))