From 07a0866402560a41ec240b4b91c243cba6f93a2e Mon Sep 17 00:00:00 2001 From: W Chan Date: Thu, 4 Jul 2019 00:12:03 +0000 Subject: [PATCH 1/2] Allow cancel to proceed if workflow execution is not found or completed Allow the workflow execution cancelation to proceed even if the workflow execution is not found or completed otherwise the method will raise an exception and the workflow execution will be stuck with a canceling status. --- CHANGELOG.rst | 6 +++ .../orquesta_runner/orquesta_runner.py | 17 ++++++++- .../orquesta_runner/tests/unit/test_cancel.py | 37 +++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5337eb1ea3..b37da70ea5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,12 @@ Changelog in development -------------- +Fixed +~~~~~ + +* Fix the workflow execution cancelation to proceed even if the workflow execution is not found or + completed. (bug fix) #4735 + 3.1.0 - June 27, 2019 --------------------- diff --git a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py index 1372dacac1..3a9b63979c 100644 --- a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py +++ b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py @@ -24,6 +24,7 @@ from st2common.constants import action as ac_const from st2common import log as logging +from st2common.exceptions import workflow as wf_svc_exc from st2common.models.api import notification as notify_api_models from st2common.persistence import execution as ex_db_access from st2common.persistence import liveaction as lv_db_access @@ -198,8 +199,20 @@ def task_cancelable(ac_ex): return wf_ex_cancelable or ac_ex_cancelable def cancel(self): - # Cancel the target workflow. - wf_svc.request_cancellation(self.execution) + # Try to cancel the target workflow execution. + try: + wf_svc.request_cancellation(self.execution) + # If workflow execution is not found because the action execution is cancelled + # before the workflow execution is created or if the workflow execution is + # already completed, then ignore the exception and proceed with cancellation. + except (wf_svc_exc.WorkflowExecutionNotFoundException, + wf_svc_exc.WorkflowExecutionIsCompletedException): + pass + # If there is an unknown exception, then log and rethrow the exception. + except Exception as e: + msg = '[%s] Unable to cancel workflow execution. %s' + LOG.error(msg, str(self.execution.id), str(e)) + raise e # Request cancellation of tasks that are workflows and still running. for child_ex_id in self.execution.children: diff --git a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py index 2e4d6a5c49..3dd13b1339 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py @@ -18,6 +18,7 @@ import st2tests +from orquesta import statuses as wf_ex_statuses from oslo_config import cfg # XXX: actionsensor import depends on config being setup. @@ -235,3 +236,39 @@ def test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows(self) # Assert the main workflow is canceling. lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + def test_cancel_before_wf_ex_db_created(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Delete the workfow execution to mock issue where the record has not been created yet. + wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0] + wf_db_access.WorkflowExecution.delete(wf_ex_db, publish=False, dispatch_trigger=False) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + def test_cancel_after_wf_ex_db_completed(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Delete the workfow execution to mock issue where the workflow is already completed + # but the liveaction and action execution have not had time to be updated. + wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0] + wf_ex_db.status = wf_ex_statuses.SUCCEEDED + wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) From c9c2b3c6178d408bfa2bcdb7336f77704456fa2c Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 9 Jul 2019 03:19:11 +0000 Subject: [PATCH 2/2] Handle unknown except during cancelation Instead of rethrowing the exception, capture and log the error and traceback, and return the error in the result. Then let the cancelation continue to process children and determine final status. If the exception is rethrown, the workflow execution will be stuck in a canceling status. Add unit test to cover this scenario. --- .../orquesta_runner/orquesta_runner.py | 23 ++++++++++++----- .../orquesta_runner/tests/unit/test_cancel.py | 25 +++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py index 3a9b63979c..47f77bb0ee 100644 --- a/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py +++ b/contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py @@ -14,6 +14,8 @@ from __future__ import absolute_import +import sys +import traceback import uuid import six @@ -199,6 +201,8 @@ def task_cancelable(ac_ex): return wf_ex_cancelable or ac_ex_cancelable def cancel(self): + result = None + # Try to cancel the target workflow execution. try: wf_svc.request_cancellation(self.execution) @@ -208,11 +212,18 @@ def cancel(self): except (wf_svc_exc.WorkflowExecutionNotFoundException, wf_svc_exc.WorkflowExecutionIsCompletedException): pass - # If there is an unknown exception, then log and rethrow the exception. - except Exception as e: - msg = '[%s] Unable to cancel workflow execution. %s' - LOG.error(msg, str(self.execution.id), str(e)) - raise e + # If there is an unknown exception, then log the error. Continue with the + # cancelation sequence below to cancel children and determine final status. + # If we rethrow the exception here, the workflow will be stuck in a canceling + # state with no options for user to clean up. It is safer to continue with + # the cancel then to revert back to some other statuses because the workflow + # execution will be in an unknown state. + except Exception: + _, ex, tb = sys.exc_info() + msg = 'Error encountered when canceling workflow execution. %s' + result = {'error': msg % str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))} + msg = '[%s] Error encountered when canceling workflow execution.' + LOG.exception(msg, str(self.execution.id)) # Request cancellation of tasks that are workflows and still running. for child_ex_id in self.execution.children: @@ -231,7 +242,7 @@ def cancel(self): return ( status, - self.liveaction.result, + result if result else self.liveaction.result, self.liveaction.context ) diff --git a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py index 3dd13b1339..58383fb9bf 100644 --- a/contrib/runners/orquesta_runner/tests/unit/test_cancel.py +++ b/contrib/runners/orquesta_runner/tests/unit/test_cancel.py @@ -272,3 +272,28 @@ def test_cancel_after_wf_ex_db_completed(self): lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + + @mock.patch.object( + wf_svc, 'request_cancellation', + mock.MagicMock(side_effect=Exception('foobar'))) + def test_cancel_unexpected_exception(self): + wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml') + lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name']) + lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result) + + # Cancel the action execution. + requester = cfg.CONF.system_user.user + lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester) + lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) + + # Make sure request cancellation is called. + self.assertTrue(wf_svc.request_cancellation.called) + + # Make sure the live action and action execution still has a canceled + # status despite of cancelation failure. The other option would be + # to raise an exception and the records will be stuck in a canceling + # status and user is unable to easily clean up. + self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED) + self.assertIn('Error encountered when canceling', lv_ac_db.result.get('error', ''))