Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ Fixed
* Correct error reported when encrypted key value is reported, and another key value parameter that requires conversion is present. #5328
Contributed by @amanda11, Ammeon Solutions

* Make ``update_executions()`` atomic by protecting the update with a coordination lock. Actions, like workflows, may have multiple
concurrent updates to their execution state. This makes those updates safer, which should make the execution status more reliable. #5358

Contributed by @khushboobhatia01


3.5.0 - June 23, 2021
Expand Down
30 changes: 22 additions & 8 deletions st2actions/tests/unit/test_workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,8 @@ def test_process(self):
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)

@mock.patch.object(
coordination_service.NoOpDriver,
"get_lock",
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
)
def test_process_error_handling(self):
@mock.patch.object(coordination_service.NoOpDriver, "get_lock")
def test_process_error_handling(self, mock_get_lock):
expected_errors = [
{
"message": "Execution failed. See result for details.",
Expand All @@ -172,6 +168,7 @@ def test_process_error_handling(self):
},
]

mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
Expand All @@ -190,6 +187,15 @@ def test_process_error_handling(self):
t1_ac_ex_db = ex_db_access.ActionExecution.query(
task_execution=str(t1_ex_db.id)
)[0]
mock_get_lock.side_effect = [
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination_service.NoOpLock(name="noop"),
coordination_service.NoOpLock(name="noop"),
]
workflows.get_engine().process(t1_ac_ex_db)

# Assert the task is marked as failed.
Expand All @@ -206,14 +212,14 @@ def test_process_error_handling(self):
@mock.patch.object(
coordination_service.NoOpDriver,
"get_lock",
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
)
@mock.patch.object(
workflows.WorkflowExecutionHandler,
"fail_workflow_execution",
mock.MagicMock(side_effect=Exception("Unexpected error.")),
)
def test_process_error_handling_has_error(self):
def test_process_error_handling_has_error(self, mock_get_lock):
mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
Expand All @@ -233,13 +239,21 @@ def test_process_error_handling_has_error(self):
task_execution=str(t1_ex_db.id)
)[0]

mock_get_lock.side_effect = [
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
]
self.assertRaisesRegexp(
Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db
)

self.assertTrue(
workflows.WorkflowExecutionHandler.fail_workflow_execution.called
)
mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")

# Since error handling failed, the workflow will have status of running.
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
Expand Down
68 changes: 36 additions & 32 deletions st2common/st2common/services/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from st2common.models.db.execution import ActionExecutionDB
from st2common.runners import utils as runners_utils
from st2common.metrics.base import Timer
from st2common.services import coordination
from six.moves import range


Expand Down Expand Up @@ -195,39 +196,42 @@ def update_execution(liveaction_db, publish=True, set_result_size=False):
"""
execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))

# Skip execution object update when action is already in completed state.
if execution.status in action_constants.LIVEACTION_COMPLETED_STATES:
LOG.debug(
"[%s] Action is already in completed state: %s. Skipping execution update to state: %s."
% (execution.id, execution.status, liveaction_db.status)
)
return execution

decomposed = _decompose_liveaction(liveaction_db)

kw = {}
for k, v in six.iteritems(decomposed):
kw["set__" + k] = v

if liveaction_db.status != execution.status:
# Note: If the status changes we store this transition in the "log" attribute of action
# execution
kw["push__log"] = _create_execution_log_entry(liveaction_db.status)

if set_result_size:
# Sadly with the current ORM abstraction there is no better way to achieve updating
# result_size and we need to serialize the value again - luckily that operation is fast.
# To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is
# negligible compared to other DB operations duration (and for smaller results it takes
# in sub ms range).
with Timer(key="action.executions.calculate_result_size"):
result_size = len(
ActionExecutionDB.result._serialize_field_value(liveaction_db.result)
with coordination.get_coordinator().get_lock(liveaction_db.id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a prefix to this to scope the change just to this block? Or are there other places already using this the live action id as the lock name that you also want to block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cognifloyd No, get_lock() is not being used anywhere else.
And to answer why tests are failing, https://github.com/StackStorm/st2/blob/master/st2actions/tests/unit/test_workflow_engine.py#L155-L159 mocks get_lock() to return ToozConnectionError (to test

with coord_svc.get_coordinator(start_heart=True).get_lock(wf_ex_id):
).

Because of the above mocking, action execution update calls will fail with my changes and cause the test assertions to fail. I'm working on the fix for this.

# Skip execution object update when action is already in completed state.
if execution.status in action_constants.LIVEACTION_COMPLETED_STATES:
LOG.debug(
"[%s] Action is already in completed state: %s. Skipping execution update to state: %s."
% (execution.id, execution.status, liveaction_db.status)
)
kw["set__result_size"] = result_size

execution = ActionExecution.update(execution, publish=publish, **kw)
return execution
return execution

decomposed = _decompose_liveaction(liveaction_db)

kw = {}
for k, v in six.iteritems(decomposed):
kw["set__" + k] = v

if liveaction_db.status != execution.status:
# Note: If the status changes we store this transition in the "log" attribute of action
# execution
kw["push__log"] = _create_execution_log_entry(liveaction_db.status)

if set_result_size:
# Sadly with the current ORM abstraction there is no better way to achieve updating
# result_size and we need to serialize the value again - luckily that operation is fast.
# To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is
# negligible compared to other DB operations duration (and for smaller results it takes
# in sub ms range).
with Timer(key="action.executions.calculate_result_size"):
result_size = len(
ActionExecutionDB.result._serialize_field_value(
liveaction_db.result
)
)
kw["set__result_size"] = result_size

execution = ActionExecution.update(execution, publish=publish, **kw)
return execution


def abandon_execution_if_incomplete(liveaction_id, publish=True):
Expand Down
41 changes: 23 additions & 18 deletions st2common/tests/unit/services/test_workflow_service_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,9 @@ def setUpClass(cls):
for pack in PACKS:
actions_registrar.register_from_pack(pack)

@mock.patch.object(
coord_svc.NoOpDriver,
"get_lock",
mock.MagicMock(
side_effect=[
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("fubar"),
coord_svc.NoOpLock(name="noop"),
]
),
)
def test_recover_from_coordinator_connection_error(self):
@mock.patch.object(coord_svc.NoOpDriver, "get_lock")
def test_recover_from_coordinator_connection_error(self, mock_get_lock):
mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
Expand All @@ -161,18 +152,25 @@ def test_recover_from_coordinator_connection_error(self):
)[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
mock_get_lock.side_effect = [
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coord_svc.NoOpLock(name="noop"),
coord_svc.NoOpLock(name="noop"),
coord_svc.NoOpLock(name="noop"),
coord_svc.NoOpLock(name="noop"),
coord_svc.NoOpLock(name="noop"),
]
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)

mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
# Workflow service should recover from retries and task1 should succeed.
tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
self.assertEqual(tk1_ex_db.status, wf_statuses.SUCCEEDED)

@mock.patch.object(
coord_svc.NoOpDriver,
"get_lock",
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
)
def test_retries_exhausted_from_coordinator_connection_error(self):
@mock.patch.object(coord_svc.NoOpDriver, "get_lock")
def test_retries_exhausted_from_coordinator_connection_error(self, mock_get_lock):
mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
Expand All @@ -189,6 +187,13 @@ def test_retries_exhausted_from_coordinator_connection_error(self):
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)

mock_get_lock.side_effect = [
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
coordination.ToozConnectionError("foobar"),
]
# The connection error should raise if retries are exhaused.
self.assertRaises(
coordination.ToozConnectionError,
Expand Down