Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions contrib/runners/orquesta_runner/tests/unit/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from tests.unit import base

from local_runner import local_shell_command_runner
from st2common.bootstrap import actionsregistrar
from st2common.bootstrap import runnersregistrar
from st2common.constants import action as ac_const
Expand Down Expand Up @@ -59,6 +60,12 @@
st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core",
]

RUNNER_RESULT_FAILED = (
ac_const.LIVEACTION_STATUS_FAILED,
{"127.0.0.1": {"hostname": "foobar"}},
{},
)


@mock.patch.object(
publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None)
Expand Down Expand Up @@ -954,6 +961,11 @@ def test_fail_manually_with_recovery_failure(self):
@mock.patch.object(
runners_utils, "invoke_post_run", mock.MagicMock(return_value=None)
)
@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]),
)
def test_include_result_to_error_log(self):
username = "stanley"
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
Expand Down Expand Up @@ -981,24 +993,11 @@ def test_include_result_to_error_log(self):
)[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertEqual(tk1_lv_ac_db.context.get("user"), username)
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)

# Manually override and fail the action execution and write some result.
# Action execution result can contain dotted notation so ensure this is tested.
result = {"127.0.0.1": {"hostname": "foobar"}}

ac_svc.update_status(
tk1_lv_ac_db,
ac_const.LIVEACTION_STATUS_FAILED,
result=result,
publish=False,
)

tk1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(tk1_ex_db.id)
)[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(tk1_lv_ac_db.result, result)

# Manually handle action execution completion.
Expand Down
22 changes: 15 additions & 7 deletions contrib/runners/orquesta_runner/tests/unit/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from tests.unit import base

import st2common
from local_runner import local_shell_command_runner
from st2actions.notifier import notifier
from st2actions.workflows import workflows
from st2common.bootstrap import actionsregistrar
Expand Down Expand Up @@ -57,6 +58,8 @@
st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core",
]

RUNNER_RESULT_FAILED = (ac_const.LIVEACTION_STATUS_FAILED, {"stderror": "..."}, {})


@mock.patch.object(
publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None)
Expand Down Expand Up @@ -114,6 +117,11 @@ def tearDown(self):
for ac_ex_db in ex_db_access.ActionExecution.get_all():
ac_ex_db.delete()

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]),
)
def test_retry_policy_applied_on_workflow_failure(self):
wf_name = "sequential"
wf_ac_ref = TEST_PACK + "." + wf_name
Expand All @@ -136,16 +144,11 @@ def test_retry_policy_applied_on_workflow_failure(self):
workflow_execution=str(wf_ex_db.id)
)[0]
t1_lv_ac_db = lv_db_access.LiveAction.query(task_execution=str(t1_ex_db.id))[0]
self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
t1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)[0]

# Manually set the status to fail.
ac_svc.update_status(t1_lv_ac_db, ac_const.LIVEACTION_STATUS_FAILED)
t1_lv_ac_db = lv_db_access.LiveAction.query(task_execution=str(t1_ex_db.id))[0]
t1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)[0]
self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
notifier.get_notifier().process(t1_ac_ex_db)
workflows.get_engine().process(t1_ac_ex_db)
Expand All @@ -158,6 +161,11 @@ def test_retry_policy_applied_on_workflow_failure(self):
# Ensure execution is retried.
self.assertEqual(len(lv_db_access.LiveAction.query(action=wf_ac_ref)), 2)

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(side_effect=[RUNNER_RESULT_FAILED]),
)
def test_no_retry_policy_applied_on_task_failure(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "subworkflow.yaml")
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
Expand Down Expand Up @@ -197,7 +205,7 @@ def test_no_retry_policy_applied_on_task_failure(self):
t1_t1_lv_ac_db = lv_db_access.LiveAction.query(
task_execution=str(t1_t1_ex_db.id)
)[0]
ac_svc.update_status(t1_t1_lv_ac_db, ac_const.LIVEACTION_STATUS_FAILED)
self.assertEqual(t1_t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(t1_t1_ex_db.id)
)[0]
Expand Down
16 changes: 12 additions & 4 deletions contrib/runners/orquesta_runner/tests/unit/test_rerun.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.services import action as action_service
from st2common.services import executions as execution_service
from st2common.services import workflows as workflow_service
from st2common.transport import liveaction as lv_ac_xport
from st2common.transport import workflow as wf_ex_xport
Expand All @@ -52,7 +51,11 @@
st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core",
]

RUNNER_RESULT_FAILED = (action_constants.LIVEACTION_STATUS_FAILED, {}, {})
RUNNER_RESULT_FAILED = (
action_constants.LIVEACTION_STATUS_FAILED,
{"stderror": "..."},
{},
)
RUNNER_RESULT_RUNNING = (
action_constants.LIVEACTION_STATUS_RUNNING,
{"stdout": "..."},
Expand Down Expand Up @@ -216,10 +219,15 @@ def test_rerun_with_missing_workflow_execution_id(self):
# Delete the workflow execution.
wf_db_access.WorkflowExecution.delete(wf_ex_db, publish=False)

# Manually delete the workflow_execution_id from context of the action execution.
# Manually delete the workflow_execution_id from context of the liveaction.
lv_ac_db1.context.pop("workflow_execution")
lv_ac_db1 = lv_db_access.LiveAction.add_or_update(lv_ac_db1, publish=False)
ac_ex_db1 = execution_service.update_execution(lv_ac_db1, publish=False)
# Manually delete the workflow_execution_id from context of the action execution.
# We cannot use execution_service.update_execution here because by the time we reach
# execution_service.update_execution, action is already in completed state.
# Popping of workflow id and and updating the execution object will not work.
ac_ex_db1.context.pop("workflow_execution")
ac_ex_db1 = ex_db_access.ActionExecution.add_or_update(ac_ex_db1, publish=False)

# Rerun the execution.
context = {"re-run": {"ref": str(ac_ex_db1.id), "tasks": ["task1"]}}
Expand Down
79 changes: 39 additions & 40 deletions contrib/runners/orquesta_runner/tests/unit/test_with_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from tests.unit import base

from local_runner import local_shell_command_runner
from st2actions.workflows import workflows
from st2common.bootstrap import actionsregistrar
from st2common.bootstrap import runnersregistrar
Expand Down Expand Up @@ -59,6 +60,18 @@
st2tests.fixturesloader.get_fixtures_packs_base_path() + "/core",
]

RUNNER_RESULT_RUNNING = (
action_constants.LIVEACTION_STATUS_RUNNING,
{"stdout": "..."},
{},
)

RUNNER_RESULT_SUCCEEDED = (
action_constants.LIVEACTION_STATUS_SUCCEEDED,
{"stdout": "..."},
{},
)


@mock.patch.object(
publishers.CUDPublisher, "publish_update", mock.MagicMock(return_value=None)
Expand Down Expand Up @@ -315,6 +328,11 @@ def test_with_items_concurrency(self):
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(return_value=RUNNER_RESULT_RUNNING),
)
def test_with_items_cancellation(self):
num_items = 3

Expand All @@ -340,16 +358,6 @@ def test_with_items_cancellation(self):
self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING)
self.assertEqual(len(t1_ac_ex_dbs), num_items)

# Reset the action executions to running status.
for ac_ex in t1_ac_ex_dbs:
self.set_execution_status(
ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING
)

t1_ac_ex_dbs = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)

status = [
ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING
for ac_ex in t1_ac_ex_dbs
Expand Down Expand Up @@ -389,6 +397,11 @@ def test_with_items_cancellation(self):
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_CANCELED)

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(return_value=RUNNER_RESULT_RUNNING),
)
def test_with_items_concurrency_cancellation(self):
concurrency = 2

Expand Down Expand Up @@ -417,16 +430,6 @@ def test_with_items_concurrency_cancellation(self):
self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING)
self.assertEqual(len(t1_ac_ex_dbs), concurrency)

# Reset the action executions to running status.
for ac_ex in t1_ac_ex_dbs:
self.set_execution_status(
ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING
)

t1_ac_ex_dbs = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)

status = [
ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING
for ac_ex in t1_ac_ex_dbs
Expand Down Expand Up @@ -466,6 +469,11 @@ def test_with_items_concurrency_cancellation(self):
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_CANCELED)

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(return_value=RUNNER_RESULT_RUNNING),
)
def test_with_items_pause_and_resume(self):
num_items = 3

Expand All @@ -491,16 +499,6 @@ def test_with_items_pause_and_resume(self):
self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING)
self.assertEqual(len(t1_ac_ex_dbs), num_items)

# Reset the action executions to running status.
for ac_ex in t1_ac_ex_dbs:
self.set_execution_status(
ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING
)

t1_ac_ex_dbs = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)

status = [
ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING
for ac_ex in t1_ac_ex_dbs
Expand Down Expand Up @@ -551,6 +549,17 @@ def test_with_items_pause_and_resume(self):
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)

@mock.patch.object(
local_shell_command_runner.LocalShellCommandRunner,
"run",
mock.MagicMock(
side_effect=[
RUNNER_RESULT_RUNNING,
RUNNER_RESULT_RUNNING,
RUNNER_RESULT_SUCCEEDED,
]
),
)
def test_with_items_concurrency_pause_and_resume(self):
num_items = 3
concurrency = 2
Expand Down Expand Up @@ -580,16 +589,6 @@ def test_with_items_concurrency_pause_and_resume(self):
self.assertEqual(t1_ex_db.status, wf_statuses.RUNNING)
self.assertEqual(len(t1_ac_ex_dbs), concurrency)

# Reset the action executions to running status.
for ac_ex in t1_ac_ex_dbs:
self.set_execution_status(
ac_ex.liveaction["id"], action_constants.LIVEACTION_STATUS_RUNNING
)

t1_ac_ex_dbs = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)

status = [
ac_ex.status == action_constants.LIVEACTION_STATUS_RUNNING
for ac_ex in t1_ac_ex_dbs
Expand Down
9 changes: 9 additions & 0 deletions st2common/st2common/services/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ def update_execution(liveaction_db, publish=True, set_result_size=False):
on the "result_size" database field.
"""
execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))

# Skip execution object update when action is already in completed state.
if execution.status in action_constants.LIVEACTION_COMPLETED_STATES:
LOG.debug(
"[%s] Action is already in completed state: %s. Skipping execution update to state: %s."
% (execution.id, execution.status, liveaction_db.status)
)
return execution

decomposed = _decompose_liveaction(liveaction_db)

kw = {}
Expand Down
16 changes: 16 additions & 0 deletions st2common/tests/unit/test_executions_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ def test_execution_update(self):
self.assertGreater(execution.log[1]["timestamp"], pre_update_timestamp)
self.assertLess(execution.log[1]["timestamp"], post_update_timestamp)

def test_skip_execution_update(self):
liveaction = self.MODELS["liveactions"]["successful_liveaction.yaml"]
executions_util.create_execution_object(liveaction)
pre_update_status = liveaction.status
liveaction.status = "running"
executions_util.update_execution(liveaction)
execution = self._get_action_execution(
liveaction__id=str(liveaction.id), raise_exception=True
)
self.assertEqual(len(execution.log), 1)
# Check status is not updated if it's already in completed state.
self.assertEqual(
pre_update_status, action_constants.LIVEACTION_STATUS_SUCCEEDED
)
self.assertEqual(execution.log[0]["status"], pre_update_status)

@mock.patch.object(PoolPublisher, "publish", mock.MagicMock())
@mock.patch.object(
runners_utils, "invoke_post_run", mock.MagicMock(return_value=None)
Expand Down