Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@
# limitations under the License.

from __future__ import absolute_import
import eventlet
import mock

from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
from st2common.bootstrap import actionsregistrar
from st2common.bootstrap import runnersregistrar
from st2common.constants import action as action_constants
from st2common.models.api import notification as notify_api_models
from st2common.models.db.liveaction import LiveActionDB
from st2common.models.system.common import ResourceReference
from st2common.persistence.execution import ActionExecution
from st2common.persistence.liveaction import LiveAction
from st2common.services import action as action_service
from st2common.util import action_db as action_db_util
from st2tests import ExecutionDbTestCase
from st2tests.fixturesloader import FixturesLoader
from st2tests import fixturesloader
from action_chain_runner import action_chain_runner as acr


from st2common.transport.liveaction import LiveActionPublisher
from st2common.transport.publishers import CUDPublisher

from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking


class DummyActionExecution(object):
def __init__(self, status=LIVEACTION_STATUS_SUCCEEDED, result=""):
def __init__(self, status=action_constants.LIVEACTION_STATUS_SUCCEEDED, result=""):
self.id = None
self.status = status
self.result = result
Expand All @@ -37,17 +49,28 @@ def __init__(self, status=LIVEACTION_STATUS_SUCCEEDED, result=""):

TEST_MODELS = {"actions": ["a1.yaml", "a2.yaml"], "runners": ["testrunner1.yaml"]}

MODELS = FixturesLoader().load_models(
MODELS = fixturesloader.FixturesLoader().load_models(
fixtures_pack=FIXTURES_PACK, fixtures_dict=TEST_MODELS
)
ACTION_1 = MODELS["actions"]["a1.yaml"]
ACTION_2 = MODELS["actions"]["a2.yaml"]
RUNNER = MODELS["runners"]["testrunner1.yaml"]

CHAIN_1_PATH = FixturesLoader().get_fixture_file_path_abs(
CHAIN_1_PATH = fixturesloader.FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, "actionchains", "chain_with_notifications.yaml"
)

TEST_PACK = "action_chain_tests"
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + "/" + TEST_PACK

PACKS = [TEST_PACK_PATH, fixturesloader.get_fixtures_packs_base_path() + "/core"]

MOCK_NOTIFY = {
"on-complete": {
"routes": ["hubot"],
}
}


@mock.patch.object(
action_db_util, "get_runnertype_by_name", mock.MagicMock(return_value=RUNNER)
Expand All @@ -60,7 +83,29 @@ def __init__(self, status=LIVEACTION_STATUS_SUCCEEDED, result=""):
@mock.patch.object(
action_service, "is_action_paused_or_pausing", mock.MagicMock(return_value=False)
)
@mock.patch.object(CUDPublisher, "publish_update", mock.MagicMock(return_value=None))
@mock.patch.object(CUDPublisher, "publish_create", mock.MagicMock(return_value=None))
@mock.patch.object(
LiveActionPublisher,
"publish_state",
mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state),
)
class TestActionChainNotifications(ExecutionDbTestCase):
@classmethod
def setUpClass(cls):
super(TestActionChainNotifications, cls).setUpClass()

# Register runners.
runnersregistrar.register_runners()

# Register test pack(s).
actions_registrar = actionsregistrar.ActionsRegistrar(
use_pack_cache=False, fail_on_failure=True
)

for pack in PACKS:
actions_registrar.register_from_pack(pack)

@mock.patch.object(
action_db_util, "get_action_by_ref", mock.MagicMock(return_value=ACTION_1)
)
Expand All @@ -86,3 +131,84 @@ def test_chain_runner_success_path(self, request):
second_call_args = request.call_args_list[1][0]
liveaction_db = second_call_args[0]
self.assertFalse(liveaction_db.notify, "Notify property not expected.")

def test_skip_notify_for_task_with_notify(self):
action = TEST_PACK + "." + "test_subworkflow_default_with_notify_task"
params = {"skip_notify": ["task1"]}
liveaction = LiveActionDB(action=action, parameters=params)
liveaction.notify = notify_api_models.NotificationsHelper.to_model(MOCK_NOTIFY)
liveaction, execution = action_service.request(liveaction)
liveaction = LiveAction.get_by_id(str(liveaction.id))

# Wait until the liveaction is running.
liveaction = self._wait_on_status(
liveaction, action_constants.LIVEACTION_STATUS_RUNNING
)

execution = self._wait_for_children(execution)
self.assertEqual(len(execution.children), 1)

# Assert task1 notify is skipped
task1_exec = ActionExecution.get_by_id(execution.children[0])
task1_live = LiveAction.get_by_id(task1_exec.liveaction["id"])
task1_live = self._wait_on_status(
task1_live, action_constants.LIVEACTION_STATUS_SUCCEEDED
)
self.assertIsNone(task1_live.notify)

execution = self._wait_for_children(execution, retries=300)
self.assertEqual(len(execution.children), 2)

# Assert task2 notify is not skipped
task2_exec = ActionExecution.get_by_id(execution.children[1])
task2_live = LiveAction.get_by_id(task2_exec.liveaction["id"])
notify = notify_api_models.NotificationsHelper.from_model(
notify_model=task2_live.notify
)
self.assertEqual(notify, MOCK_NOTIFY)
MockLiveActionPublisherNonBlocking.wait_all()

def test_skip_notify_default_for_task_with_notify(self):
action = TEST_PACK + "." + "test_subworkflow_default_with_notify_task"
liveaction = LiveActionDB(action=action)
liveaction.notify = notify_api_models.NotificationsHelper.to_model(MOCK_NOTIFY)
liveaction, execution = action_service.request(liveaction)
liveaction = LiveAction.get_by_id(str(liveaction.id))

# Wait until the liveaction is running.
liveaction = self._wait_on_status(
liveaction, action_constants.LIVEACTION_STATUS_RUNNING
)

execution = self._wait_for_children(execution)
self.assertEqual(len(execution.children), 1)

# Assert task1 notify is set.
task1_exec = ActionExecution.get_by_id(execution.children[0])
task1_live = LiveAction.get_by_id(task1_exec.liveaction["id"])
task1_live = self._wait_on_status(
task1_live, action_constants.LIVEACTION_STATUS_SUCCEEDED
)
notify = notify_api_models.NotificationsHelper.from_model(
notify_model=task1_live.notify
)
self.assertEqual(notify, MOCK_NOTIFY)

execution = self._wait_for_children(execution, retries=300)
self.assertEqual(len(execution.children), 2)

# Assert task2 notify is not skipped by default.
task2_exec = ActionExecution.get_by_id(execution.children[1])
task2_live = LiveAction.get_by_id(task2_exec.liveaction["id"])
self.assertIsNone(task2_live.notify)
MockLiveActionPublisherNonBlocking.wait_all()

def _wait_for_children(self, execution, interval=0.1, retries=100):
# Wait until the execution has children.
for i in range(0, retries):
execution = ActionExecution.get_by_id(str(execution.id))
if len(getattr(execution, "children", [])) <= 0:
eventlet.sleep(interval)
continue

return execution
71 changes: 71 additions & 0 deletions contrib/runners/orquesta_runner/tests/unit/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,74 @@ def test_cascade_notify_to_tasks(self):
self.assertEqual(ac_ex_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(notifier.Notifier._post_notify_triggers.called)
notifier.Notifier._post_notify_triggers.reset_mock()

def test_notify_task_list_for_task_with_notify(self):
wf_meta = base.get_wf_fixture_meta_data(
TEST_PACK_PATH, "subworkflow-with-notify-task.yaml"
)
lv_ac_db = lv_db_models.LiveActionDB(
action=wf_meta["name"], parameters={"notify": ["task2"]}
)
lv_ac_db.notify = notify_api_models.NotificationsHelper.to_model(MOCK_NOTIFY)
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)

# Assert action execution is running.
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
wf_ex_db = wf_db_access.WorkflowExecution.query(
action_execution=str(ac_ex_db.id)
)[0]
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)

# Assert task1 notify is not set.
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task1"}
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(tk1_ex_db.id)
)[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertIsNone(tk1_lv_ac_db.notify)
# Assert task2 notify is set.
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task2"}
tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk2_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(tk2_ex_db.id)
)[0]
tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction["id"])
notify = notify_api_models.NotificationsHelper.from_model(
notify_model=tk2_lv_ac_db.notify
)
self.assertEqual(notify, MOCK_NOTIFY)

def test_no_notify_for_task_with_notify(self):
wf_meta = base.get_wf_fixture_meta_data(
TEST_PACK_PATH, "subworkflow-with-notify-task.yaml"
)
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)

# Assert action execution is running.
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_RUNNING)
wf_ex_db = wf_db_access.WorkflowExecution.query(
action_execution=str(ac_ex_db.id)
)[0]
self.assertEqual(wf_ex_db.status, action_constants.LIVEACTION_STATUS_RUNNING)

# Assert task1 notify is not set.
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task1"}
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(tk1_ex_db.id)
)[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertIsNone(tk1_lv_ac_db.notify)

# Assert task2 notify is not set.
query_filters = {"workflow_execution": str(wf_ex_db.id), "task_id": "task2"}
tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk2_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(tk2_ex_db.id)
)[0]
tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction["id"])
self.assertIsNone(tk2_lv_ac_db.notify)
56 changes: 55 additions & 1 deletion st2common/st2common/services/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
from st2common.persistence.liveaction import LiveAction
from st2common.persistence.execution import ActionExecution
from st2common.persistence.execution import ActionExecutionOutput
from st2common.persistence.workflow import TaskExecution
from st2common.persistence.workflow import WorkflowExecution
from st2common.models.db.execution import ActionExecutionOutputDB
from st2common.runners import utils as runners_utils
from st2common.services import executions
from st2common.services import trace as trace_service
from st2common.services import workflows as workflow_service
from st2common.util import date as date_utils
from st2common.util import action_db as action_utils
from st2common.util import schema as util_schema
Expand Down Expand Up @@ -130,7 +133,7 @@ def create_request(liveaction, action_db=None, runnertype_db=None):
# XXX: There are cases when we don't want notifications to be sent for a particular
# execution. So we should look at liveaction.parameters['notify']
# and not set liveaction.notify.
if not _is_notify_empty(action_db.notify):
if not _is_notify_skipped(liveaction) and not _is_notify_empty(action_db.notify):
liveaction.notify = action_db.notify

# Write to database and send to message queue.
Expand Down Expand Up @@ -548,3 +551,54 @@ def _is_notify_empty(notify_db):
if not notify_db:
return True
return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)


def _is_notify_skipped(liveaction):
"""
notification is skipped if action execution is under workflow context and
task is not specified under wf_ex_db.notify["tasks"].
"""
is_under_workflow_context = (
workflow_service.is_action_execution_under_workflow_context(liveaction)
)
is_under_action_chain_context = is_action_execution_under_action_chain_context(
liveaction
)
if is_under_workflow_context:
wf_ex_db = WorkflowExecution.get(
id=liveaction.workflow_execution, only_fields=["notify"]
)
task_ex_db = TaskExecution.get(
id=liveaction.task_execution, only_fields=["task_name"]
)
return not wf_ex_db.notify or task_ex_db.task_name not in wf_ex_db.notify.get(
"tasks", {}
)
if is_under_action_chain_context:
task_name = liveaction.context["chain"]["name"]
parent = liveaction.context.get("parent")
if parent:
parent_execution_db = ActionExecution.get(
id=parent["execution_id"],
only_fields=["action.parameters", "parameters"],
)
skip_notify_tasks = parent_execution_db["parameters"].get("skip_notify", [])
default_skip_notify_tasks = parent_execution_db["action"]["parameters"].get(
"skip_notify", {}
)
if skip_notify_tasks:
if task_name in skip_notify_tasks:
return True
# If skip_notify parameter is specified, but task is not skipped.
return False
# If skip_notify parameter is not specified, check the task in default list.
return task_name in default_skip_notify_tasks.get("default", [])
return False


def is_action_execution_under_action_chain_context(liveaction):
"""
The action execution is executed under action-chain context
if it contains the chain key in its context dictionary.
"""
return liveaction.context and "chain" in liveaction.context
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
chain:
-
name: task1
ref: core.local
params:
cmd: echo foobar

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
chain:
-
name: task1
ref: action_chain_tests.test_chain_sequential
on-success: task2

-
name: task2
ref: core.local
params:
cmd: echo foobar

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
name: test_chain_sequential
description: A sample workflow which has notify.
pack: action_chain_tests
runner_type: action-chain
entry_point: chains/test_chain_sequential.yaml
enabled: true
notify:
on-complete:
routes:
- hubot
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
name: test_subworkflow_default_with_notify_task
description: A sample workflow that calls another subworkflow which has notify.
pack: action_chain_tests
runner_type: action-chain
entry_point: chains/test_subworkflow_default_with_notify_task.yaml
enabled: true
parameters:
skip_notify:
default:
- task2
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
name: sequential-notify
description: A basic sequential workflow with notify on-complete.
pack: orquesta_tests
runner_type: orquesta
entry_point: workflows/sequential.yaml
enabled: true
notify:
on-complete:
routes:
- hubot
parameters:
who:
required: true
type: string
default: Stanley

Loading