From 14cf46cf805bba721aafa2f3ee50fc7022d9ebf4 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Tue, 14 Sep 2021 10:31:03 +0530 Subject: [PATCH 01/11] Make update_execution() atomic --- st2common/st2common/services/executions.py | 66 +++++++++++----------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/st2common/st2common/services/executions.py b/st2common/st2common/services/executions.py index fe3b7bb591..38a72193de 100644 --- a/st2common/st2common/services/executions.py +++ b/st2common/st2common/services/executions.py @@ -48,6 +48,7 @@ from st2common.models.db.execution import ActionExecutionDB from st2common.runners import utils as runners_utils from st2common.metrics.base import Timer +from st2common.services import coordination from six.moves import range @@ -195,39 +196,40 @@ def update_execution(liveaction_db, publish=True, set_result_size=False): """ execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) - # Skip execution object update when action is already in completed state. - if execution.status in action_constants.LIVEACTION_COMPLETED_STATES: - LOG.debug( - "[%s] Action is already in completed state: %s. Skipping execution update to state: %s." - % (execution.id, execution.status, liveaction_db.status) - ) - return execution - - decomposed = _decompose_liveaction(liveaction_db) - - kw = {} - for k, v in six.iteritems(decomposed): - kw["set__" + k] = v - - if liveaction_db.status != execution.status: - # Note: If the status changes we store this transition in the "log" attribute of action - # execution - kw["push__log"] = _create_execution_log_entry(liveaction_db.status) - - if set_result_size: - # Sadly with the current ORM abstraction there is no better way to achieve updating - # result_size and we need to serialize the value again - luckily that operation is fast. - # To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is - # negligible compared to other DB operations duration (and for smaller results it takes - # in sub ms range). - with Timer(key="action.executions.calculate_result_size"): - result_size = len( - ActionExecutionDB.result._serialize_field_value(liveaction_db.result) + with coordination.get_coordinator().get_lock(liveaction_db.id): + # Skip execution object update when action is already in completed state. + if execution.status in action_constants.LIVEACTION_COMPLETED_STATES: + LOG.debug( + "[%s] Action is already in completed state: %s. Skipping execution update to state: %s." + % (execution.id, execution.status, liveaction_db.status) ) - kw["set__result_size"] = result_size - - execution = ActionExecution.update(execution, publish=publish, **kw) - return execution + return execution + + decomposed = _decompose_liveaction(liveaction_db) + + kw = {} + for k, v in six.iteritems(decomposed): + kw["set__" + k] = v + + if liveaction_db.status != execution.status: + # Note: If the status changes we store this transition in the "log" attribute of action + # execution + kw["push__log"] = _create_execution_log_entry(liveaction_db.status) + + if set_result_size: + # Sadly with the current ORM abstraction there is no better way to achieve updating + # result_size and we need to serialize the value again - luckily that operation is fast. + # To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is + # negligible compared to other DB operations duration (and for smaller results it takes + # in sub ms range). + with Timer(key="action.executions.calculate_result_size"): + result_size = len( + ActionExecutionDB.result._serialize_field_value(liveaction_db.result) + ) + kw["set__result_size"] = result_size + + execution = ActionExecution.update(execution, publish=publish, **kw) + return execution def abandon_execution_if_incomplete(liveaction_id, publish=True): From 20045dc009f9168373e73e08d3e242985c5c45ed Mon Sep 17 00:00:00 2001 From: Khushboo Date: Tue, 14 Sep 2021 11:46:18 +0530 Subject: [PATCH 02/11] Black reformat --- st2common/st2common/services/executions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/st2common/st2common/services/executions.py b/st2common/st2common/services/executions.py index 38a72193de..fe8de613b1 100644 --- a/st2common/st2common/services/executions.py +++ b/st2common/st2common/services/executions.py @@ -224,7 +224,9 @@ def update_execution(liveaction_db, publish=True, set_result_size=False): # in sub ms range). with Timer(key="action.executions.calculate_result_size"): result_size = len( - ActionExecutionDB.result._serialize_field_value(liveaction_db.result) + ActionExecutionDB.result._serialize_field_value( + liveaction_db.result + ) ) kw["set__result_size"] = result_size From a33c319e003344f5dc6f0103d3bad1a6ab19d735 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Wed, 15 Sep 2021 18:21:50 +0530 Subject: [PATCH 03/11] Fix chunk 2 test cases --- st2actions/tests/unit/test_workflow_engine.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/st2actions/tests/unit/test_workflow_engine.py b/st2actions/tests/unit/test_workflow_engine.py index b8e4fae83f..da36666bb3 100644 --- a/st2actions/tests/unit/test_workflow_engine.py +++ b/st2actions/tests/unit/test_workflow_engine.py @@ -152,12 +152,8 @@ def test_process(self): lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id)) self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED) - @mock.patch.object( - coordination_service.NoOpDriver, - "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), - ) - def test_process_error_handling(self): + @mock.patch.object(coordination_service.NoOpDriver, "get_lock") + def test_process_error_handling(self, mock_get_lock): expected_errors = [ { "message": "Execution failed. See result for details.", @@ -172,6 +168,7 @@ def test_process_error_handling(self): }, ] + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = action_service.request(lv_ac_db) @@ -190,6 +187,15 @@ def test_process_error_handling(self): t1_ac_ex_db = ex_db_access.ActionExecution.query( task_execution=str(t1_ex_db.id) )[0] + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination_service.NoOpLock(name="noop"), + coordination_service.NoOpLock(name="noop"), + ] workflows.get_engine().process(t1_ac_ex_db) # Assert the task is marked as failed. @@ -206,14 +212,14 @@ def test_process_error_handling(self): @mock.patch.object( coordination_service.NoOpDriver, "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), ) @mock.patch.object( workflows.WorkflowExecutionHandler, "fail_workflow_execution", mock.MagicMock(side_effect=Exception("Unexpected error.")), ) - def test_process_error_handling_has_error(self): + def test_process_error_handling_has_error(self, mock_get_lock): + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = action_service.request(lv_ac_db) @@ -233,6 +239,15 @@ def test_process_error_handling_has_error(self): task_execution=str(t1_ex_db.id) )[0] + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination_service.NoOpLock(name="noop"), + coordination_service.NoOpLock(name="noop"), + ] self.assertRaisesRegexp( Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db ) From 7c42aad189182063295af60afcca39f9b3cd0426 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Wed, 15 Sep 2021 18:53:41 +0530 Subject: [PATCH 04/11] Fix chunk 2 unit test cases --- .../services/test_workflow_service_retries.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/st2common/tests/unit/services/test_workflow_service_retries.py b/st2common/tests/unit/services/test_workflow_service_retries.py index 8f33130d34..0d6edc9cc9 100644 --- a/st2common/tests/unit/services/test_workflow_service_retries.py +++ b/st2common/tests/unit/services/test_workflow_service_retries.py @@ -134,18 +134,9 @@ def setUpClass(cls): for pack in PACKS: actions_registrar.register_from_pack(pack) - @mock.patch.object( - coord_svc.NoOpDriver, - "get_lock", - mock.MagicMock( - side_effect=[ - coordination.ToozConnectionError("foobar"), - coordination.ToozConnectionError("fubar"), - coord_svc.NoOpLock(name="noop"), - ] - ), - ) - def test_recover_from_coordinator_connection_error(self): + @mock.patch.object(coord_svc.NoOpDriver, "get_lock") + def test_recover_from_coordinator_connection_error(self, mock_get_lock): + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) @@ -161,18 +152,25 @@ def test_recover_from_coordinator_connection_error(self): )[0] tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + coord_svc.NoOpLock(name="noop"), + ] wf_svc.handle_action_execution_completion(tk1_ac_ex_db) + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") # Workflow service should recover from retries and task1 should succeed. tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id) self.assertEqual(tk1_ex_db.status, wf_statuses.SUCCEEDED) - @mock.patch.object( - coord_svc.NoOpDriver, - "get_lock", - mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")), - ) - def test_retries_exhausted_from_coordinator_connection_error(self): + @mock.patch.object(coord_svc.NoOpDriver, "get_lock") + def test_retries_exhausted_from_coordinator_connection_error(self, mock_get_lock): + mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop") wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml") lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"]) lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db) @@ -189,6 +187,13 @@ def test_retries_exhausted_from_coordinator_connection_error(self): tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"]) self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED) + mock_get_lock.side_effect = [ + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + coordination.ToozConnectionError("foobar"), + ] # The connection error should raise if retries are exhaused. self.assertRaises( coordination.ToozConnectionError, From 1d12cfe3ae349195de2197835f9b7a598fdb3a58 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Wed, 15 Sep 2021 20:30:26 +0530 Subject: [PATCH 05/11] Retrigger CI From 67cc8297223ad1d631be7393e11fa4a4274f0d37 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 16 Sep 2021 17:04:54 +0530 Subject: [PATCH 06/11] Retrigger CI From d6a999ccefecf5db9e69e4847237c6612a3afb65 Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 23 Sep 2021 14:37:20 +0530 Subject: [PATCH 07/11] Retrigger CI From 3ce5819a1663816874e65017bd82f0d3be20f86f Mon Sep 17 00:00:00 2001 From: Khushboo Date: Thu, 23 Sep 2021 17:11:17 +0530 Subject: [PATCH 08/11] Retrigger CI From 4eb19d5b06f26364018d0f48e5afa1cfb443fbad Mon Sep 17 00:00:00 2001 From: Khushboo Date: Tue, 5 Oct 2021 11:40:16 +0530 Subject: [PATCH 09/11] Fix test case --- st2actions/tests/unit/test_workflow_engine.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/st2actions/tests/unit/test_workflow_engine.py b/st2actions/tests/unit/test_workflow_engine.py index da36666bb3..2b95f954ea 100644 --- a/st2actions/tests/unit/test_workflow_engine.py +++ b/st2actions/tests/unit/test_workflow_engine.py @@ -244,9 +244,7 @@ def test_process_error_handling_has_error(self, mock_get_lock): coordination.ToozConnectionError("foobar"), coordination.ToozConnectionError("foobar"), coordination.ToozConnectionError("foobar"), - coordination.ToozConnectionError("foobar"), - coordination_service.NoOpLock(name="noop"), - coordination_service.NoOpLock(name="noop"), + coordination.ToozConnectionError("foobar") ] self.assertRaisesRegexp( Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db @@ -255,6 +253,7 @@ def test_process_error_handling_has_error(self, mock_get_lock): self.assertTrue( workflows.WorkflowExecutionHandler.fail_workflow_execution.called ) + mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop") # Since error handling failed, the workflow will have status of running. wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id) From 44f691e993cb92bb692cc28d0ae5576cc549385b Mon Sep 17 00:00:00 2001 From: Khushboo Date: Tue, 5 Oct 2021 11:50:51 +0530 Subject: [PATCH 10/11] Reformat --- st2actions/tests/unit/test_workflow_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/st2actions/tests/unit/test_workflow_engine.py b/st2actions/tests/unit/test_workflow_engine.py index 2b95f954ea..7c572e7ebb 100644 --- a/st2actions/tests/unit/test_workflow_engine.py +++ b/st2actions/tests/unit/test_workflow_engine.py @@ -244,7 +244,7 @@ def test_process_error_handling_has_error(self, mock_get_lock): coordination.ToozConnectionError("foobar"), coordination.ToozConnectionError("foobar"), coordination.ToozConnectionError("foobar"), - coordination.ToozConnectionError("foobar") + coordination.ToozConnectionError("foobar"), ] self.assertRaisesRegexp( Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db From ec52b8937912ca1f9dd6a9494ac61c8b3ef641a6 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Tue, 5 Oct 2021 15:11:43 -0500 Subject: [PATCH 11/11] add changelog entry --- CHANGELOG.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ac8c8beb8e..e880a3269b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -77,6 +77,10 @@ Fixed * Correct error reported when encrypted key value is reported, and another key value parameter that requires conversion is present. #5328 Contributed by @amanda11, Ammeon Solutions +* Make ``update_executions()`` atomic by protecting the update with a coordination lock. Actions, like workflows, may have multiple + concurrent updates to their execution state. This makes those updates safer, which should make the execution status more reliable. #5358 + + Contributed by @khushboobhatia01 3.5.0 - June 23, 2021