diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ac8c8beb8e..e880a3269b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -77,6 +77,10 @@ Fixed * Correct error reported when encrypted key value is reported, and another key value parameter that requires conversion is present. #5328 Contributed by @amanda11, Ammeon Solutions +* Make ``update_executions()`` atomic by protecting the update with a coordination lock. Actions, like workflows, may have multiple + concurrent updates to their execution state. This makes those updates safer, which should make the execution status more reliable. #5358 + + Contributed by @khushboobhatia01 3.5.0 - June 23, 2021 diff --git a/st2actions/tests/unit/test_workflow_engine.py b/st2actions/tests/unit/test_workflow_engine.py index b8e4fae83f..7c572e7ebb 100644 --- a/st2actions/tests/unit/test_workflow_engine.py +++ b/st2actions/tests/unit/test_workflow_engine.py @@ -152,12 +152,8 @@ def test_process(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED) - @mock.patch.object( - coordination_service.NoOpDriver, - "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), - ) - def test_process_error_handling(self): + @mock.patch.object(coordination_service.NoOpDriver, "get_lock") + def test_process_error_handling(self, mock_get_lock): expected_errors = [ { "message": "Execution failed. See result for details.", @@ -172,6 +168,7 @@ def test_process_error_handling(self): }, ] + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = action_service.request(lv_ac_db) @@ -190,6 +187,15 @@ def test_process_error_handling(self): t1_ac_ex_db = ex_db_access.ActionExecution.query( task_execution=str(t1_ex_db.id) )[0] + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination_service.NoOpLock(name="noop"), + coordination_service.NoOpLock(name="noop"), + ] workflows.get_engine().process(t1_ac_ex_db) # Assert the task is marked as failed. @@ -206,14 +212,14 @@ def test_process_error_handling(self): @mock.patch.object( coordination_service.NoOpDriver, "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), ) @mock.patch.object( workflows.WorkflowExecutionHandler, "fail_workflow_execution", mock.MagicMock(side_effect=Exception("Unexpected error.")), ) - def test_process_error_handling_has_error(self): + def test_process_error_handling_has_error(self, mock_get_lock): + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = action_service.request(lv_ac_db) @@ -233,6 +239,13 @@ def test_process_error_handling_has_error(self): task_execution=str(t1_ex_db.id) )[0] + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + ] self.assertRaisesRegexp( Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db ) @@ -240,6 +253,7 @@ def test_process_error_handling_has_error(self): self.assertTrue( workflows.WorkflowExecutionHandler.fail_workflow_execution.called ) + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") # Since error handling failed, the workflow will have status of running. wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id) diff --git a/st2common/st2common/services/executions.py b/st2common/st2common/services/executions.py index fe3b7bb591..fe8de613b1 100644 --- a/st2common/st2common/services/executions.py +++ b/st2common/st2common/services/executions.py @@ -48,6 +48,7 @@ from st2common.models.db.execution import ActionExecutionDB from st2common.runners import utils as runners_utils from st2common.metrics.base import Timer +from st2common.services import coordination from six.moves import range @@ -195,39 +196,42 @@ def update_execution(liveaction_db, publish=True, set_result_size=False): """ execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) - # Skip execution object update when action is already in completed state. - if execution.status in action_constants.LIVEACTION_COMPLETED_STATES: - LOG.debug( - "[%s] Action is already in completed state: %s. Skipping execution update to state: %s." - % (execution.id, execution.status, liveaction_db.status) - ) - return execution - - decomposed = _decompose_liveaction(liveaction_db) - - kw = {} - for k, v in six.iteritems(decomposed): - kw["set__" + k] = v - - if liveaction_db.status != execution.status: - # Note: If the status changes we store this transition in the "log" attribute of action - # execution - kw["push__log"] = _create_execution_log_entry(liveaction_db.status) - - if set_result_size: - # Sadly with the current ORM abstraction there is no better way to achieve updating - # result_size and we need to serialize the value again - luckily that operation is fast. - # To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is - # negligible compared to other DB operations duration (and for smaller results it takes - # in sub ms range). - with Timer(key="action.executions.calculate_result_size"): - result_size = len( - ActionExecutionDB.result._serialize_field_value(liveaction_db.result) + with coordination.get_coordinator().get_lock(liveaction_db.id): + # Skip execution object update when action is already in completed state. + if execution.status in action_constants.LIVEACTION_COMPLETED_STATES: + LOG.debug( + "[%s] Action is already in completed state: %s. Skipping execution update to state: %s." + % (execution.id, execution.status, liveaction_db.status) ) - kw["set__result_size"] = result_size - - execution = ActionExecution.update(execution, publish=publish, **kw) - return execution + return execution + + decomposed = _decompose_liveaction(liveaction_db) + + kw = {} + for k, v in six.iteritems(decomposed): + kw["set__" + k] = v + + if liveaction_db.status != execution.status: + # Note: If the status changes we store this transition in the "log" attribute of action + # execution + kw["push__log"] = _create_execution_log_entry(liveaction_db.status) + + if set_result_size: + # Sadly with the current ORM abstraction there is no better way to achieve updating + # result_size and we need to serialize the value again - luckily that operation is fast. + # To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is + # negligible compared to other DB operations duration (and for smaller results it takes + # in sub ms range). + with Timer(key="action.executions.calculate_result_size"): + result_size = len( + ActionExecutionDB.result._serialize_field_value( + liveaction_db.result + ) + ) + kw["set__result_size"] = result_size + + execution = ActionExecution.update(execution, publish=publish, **kw) + return execution def abandon_execution_if_incomplete(liveaction_id, publish=True): diff --git a/st2common/tests/unit/services/test_workflow_service_retries.py b/st2common/tests/unit/services/test_workflow_service_retries.py index 8f33130d34..0d6edc9cc9 100644 --- a/st2common/tests/unit/services/test_workflow_service_retries.py +++ b/st2common/tests/unit/services/test_workflow_service_retries.py @@ -134,18 +134,9 @@ def setUpClass(cls): for pack in PACKS: actions_registrar.register_from_pack(pack) - @mock.patch.object( - coord_svc.NoOpDriver, - "get_lock", - mock.MagicMock( - side_effect=[ - coordination.ToozConnectionError("foobar"), - coordination.ToozConnectionError("fubar"), - coord_svc.NoOpLock(name="noop"), - ] - ), - ) - def test_recover_from_coordinator_connection_error(self): + @mock.patch.object(coord_svc.NoOpDriver, "get_lock") + def test_recover_from_coordinator_connection_error(self, mock_get_lock): + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) @@ -161,18 +152,25 @@ def test_recover_from_coordinator_connection_error(self): )[0] tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + ] wf_svc.handle_action_execution_completion(tk1_ac_ex_db) + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") # Workflow service should recover from retries and task1 should succeed. tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id) self.assertEqual(tk1_ex_db.status, wf_statuses.SUCCEEDED) - @mock.patch.object( - coord_svc.NoOpDriver, - "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), - ) - def test_retries_exhausted_from_coordinator_connection_error(self): + @mock.patch.object(coord_svc.NoOpDriver, "get_lock") + def test_retries_exhausted_from_coordinator_connection_error(self, mock_get_lock): + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) @@ -189,6 +187,13 @@ def test_retries_exhausted_from_coordinator_connection_error(self): tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + ] # The connection error should raise if retries are exhaused. self.assertRaises( coordination.ToozConnectionError,