Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ Changelog
in development
--------------

Fixed
~~~~~

* Fix the workflow execution cancelation to proceed even if the workflow execution is not found or
completed. (bug fix) #4735


3.1.0 - June 27, 2019
---------------------
Expand Down
30 changes: 27 additions & 3 deletions contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from __future__ import absolute_import

import sys
import traceback
import uuid

import six
Expand All @@ -24,6 +26,7 @@

from st2common.constants import action as ac_const
from st2common import log as logging
from st2common.exceptions import workflow as wf_svc_exc
from st2common.models.api import notification as notify_api_models
from st2common.persistence import execution as ex_db_access
from st2common.persistence import liveaction as lv_db_access
Expand Down Expand Up @@ -198,8 +201,29 @@ def task_cancelable(ac_ex):
return wf_ex_cancelable or ac_ex_cancelable

def cancel(self):
# Cancel the target workflow.
wf_svc.request_cancellation(self.execution)
result = None

# Try to cancel the target workflow execution.
try:
wf_svc.request_cancellation(self.execution)
# If workflow execution is not found because the action execution is cancelled
# before the workflow execution is created or if the workflow execution is
# already completed, then ignore the exception and proceed with cancellation.
except (wf_svc_exc.WorkflowExecutionNotFoundException,
wf_svc_exc.WorkflowExecutionIsCompletedException):
pass
# If there is an unknown exception, then log the error. Continue with the
# cancelation sequence below to cancel children and determine final status.
# If we rethrow the exception here, the workflow will be stuck in a canceling
# state with no options for user to clean up. It is safer to continue with
# the cancel then to revert back to some other statuses because the workflow
# execution will be in an unknown state.
except Exception:
_, ex, tb = sys.exc_info()
msg = 'Error encountered when canceling workflow execution. %s'
result = {'error': msg % str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
msg = '[%s] Error encountered when canceling workflow execution.'
LOG.exception(msg, str(self.execution.id))

# Request cancellation of tasks that are workflows and still running.
for child_ex_id in self.execution.children:
Expand All @@ -218,7 +242,7 @@ def cancel(self):

return (
status,
self.liveaction.result,
result if result else self.liveaction.result,
self.liveaction.context
)

Expand Down
62 changes: 62 additions & 0 deletions contrib/runners/orquesta_runner/tests/unit/test_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import st2tests

from orquesta import statuses as wf_ex_statuses
from oslo_config import cfg

# XXX: actionsensor import depends on config being setup.
Expand Down Expand Up @@ -235,3 +236,64 @@ def test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows(self)
# Assert the main workflow is canceling.
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)

def test_cancel_before_wf_ex_db_created(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)

# Delete the workfow execution to mock issue where the record has not been created yet.
wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
wf_db_access.WorkflowExecution.delete(wf_ex_db, publish=False, dispatch_trigger=False)

# Cancel the action execution.
requester = cfg.CONF.system_user.user
lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)

def test_cancel_after_wf_ex_db_completed(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)

# Delete the workfow execution to mock issue where the workflow is already completed
# but the liveaction and action execution have not had time to be updated.
wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
wf_ex_db.status = wf_ex_statuses.SUCCEEDED
wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)

# Cancel the action execution.
requester = cfg.CONF.system_user.user
lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)

@mock.patch.object(
wf_svc, 'request_cancellation',
mock.MagicMock(side_effect=Exception('foobar')))
def test_cancel_unexpected_exception(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)

# Cancel the action execution.
requester = cfg.CONF.system_user.user
lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))

# Make sure request cancellation is called.
self.assertTrue(wf_svc.request_cancellation.called)

# Make sure the live action and action execution still has a canceled
# status despite of cancelation failure. The other option would be
# to raise an exception and the records will be stuck in a canceling
# status and user is unable to easily clean up.
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
self.assertIn('Error encountered when canceling', lv_ac_db.result.get('error', ''))