From 6271014cf380ff614b8e9a3ab04d3b214421fa54 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 30 Jul 2019 21:08:02 +0200 Subject: [PATCH 1/6] Fix and combine integration tests targets. Make sure we only generate coverage for runners and orquesta integration tests when ENABLE_COVERAGE environment variable is set to "yes". Coverage adds a lot of overhead so this should speed up PR builds. --- Makefile | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 08fec2c261..b7f4a3780a 100644 --- a/Makefile +++ b/Makefile @@ -665,6 +665,28 @@ endif echo "Done running tests in" $$component; \ echo "==========================================================="; \ done + @echo + @echo "============== runners integration tests with coverage ==============" + @echo + @echo "The tests assume st2 is running on 127.0.0.1." + @for component in $(COMPONENTS_RUNNERS); do\ + echo "==========================================================="; \ + echo "Running tests in" $$component; \ + echo "==========================================================="; \ + . $(VIRTUALENV_DIR)/bin/activate; \ + COVERAGE_FILE=.coverage.integration.$$(echo $$component | tr '/' '.') \ + nosetests $(NOSE_OPTS) -s -v \ + $(NOSE_COVERAGE_FLAGS) $(NOSE_COVERAGE_PACKAGES) $$component/tests/integration || exit 1; \ + done + @echo + @echo "==================== Orquesta integration tests with coverage (HTML reports) ====================" + @echo "The tests assume st2 is running on 127.0.0.1." + @echo + . $(VIRTUALENV_DIR)/bin/activate; \ + COVERAGE_FILE=.coverage.integration.orquesta \ + nosetests $(NOSE_OPTS) -s -v \ + $(NOSE_COVERAGE_FLAGS) $(NOSE_COVERAGE_PACKAGES) st2tests/integration/orquesta || exit 1; \ + .PHONY: .combine-integration-tests-coverage .combine-integration-tests-coverage: .run-integration-tests-coverage @@ -960,7 +982,7 @@ ci-unit: .unit-tests-coverage-html sudo -E ./scripts/travis/prepare-integration.sh .PHONY: ci-integration -ci-integration: .ci-prepare-integration .itests-coverage-html .runners-itests-coverage-html .orquesta-itests-coverage-html +ci-integration: .ci-prepare-integration .itests-coverage-html .PHONY: ci-runners ci-runners: .ci-prepare-integration .runners-itests-coverage-html From ad37d686c8c9db283106cd6fcbde90fe92656736 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 30 Jul 2019 22:22:19 +0200 Subject: [PATCH 2/6] Try to decrease wait delay, see if that helps. --- st2tests/integration/orquesta/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/st2tests/integration/orquesta/base.py b/st2tests/integration/orquesta/base.py index dcb4206260..697a700b1c 100644 --- a/st2tests/integration/orquesta/base.py +++ b/st2tests/integration/orquesta/base.py @@ -84,7 +84,7 @@ def _execute_workflow(self, action, parameters=None, execute_async=True, @retrying.retry( retry_on_exception=retry_on_exceptions, - wait_fixed=3000, stop_max_delay=900000) + wait_fixed=500, stop_max_delay=900000) def _wait_for_state(self, ex, states): if isinstance(states, six.string_types): states = [states] @@ -113,7 +113,7 @@ def _get_children(self, ex): @retrying.retry( retry_on_exception=retry_on_exceptions, - wait_fixed=3000, stop_max_delay=900000) + wait_fixed=500, stop_max_delay=900000) def _wait_for_task(self, ex, task, status=None, num_task_exs=1): ex = self.st2client.executions.get_by_id(ex.id) From 4d0f12bee3d1a3329397035030bfa4e100feed1b Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Wed, 31 Jul 2019 12:06:18 +0200 Subject: [PATCH 3/6] Add a work around for two tests which rely on longer retry delay. --- .../integration/orquesta/test_wiring_error_handling.py | 7 +++++++ .../integration/orquesta/test_wiring_pause_and_resume.py | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/st2tests/integration/orquesta/test_wiring_error_handling.py b/st2tests/integration/orquesta/test_wiring_error_handling.py index 7f70dad869..be49756b31 100644 --- a/st2tests/integration/orquesta/test_wiring_error_handling.py +++ b/st2tests/integration/orquesta/test_wiring_error_handling.py @@ -14,6 +14,7 @@ from __future__ import absolute_import +import eventlet from integration.orquesta import base from st2common.constants import action as ac_const @@ -244,6 +245,12 @@ def test_remediate_then_fail(self): ex = self._wait_for_completion(ex) # Assert that the log task is executed. + # NOTE: There is a race wheen execution gets in a desired state, but before the child + # tasks are written. To avoid that, we use longer sleep delay here. + # Better approach would be to try to retry a couple of times until expected num of + # tasks is reached (With some hard limit) before failing + eventlet.sleep(2) + self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_FAILED) self._wait_for_task(ex, 'log', ac_const.LIVEACTION_STATUS_SUCCEEDED) diff --git a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py index 6e8a3abe7e..a2bd3ec69f 100644 --- a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py +++ b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import os +import eventlet from integration.orquesta import base @@ -177,6 +178,12 @@ def test_pause_and_resume_cascade_from_subworkflow(self): self.assertFalse(os.path.exists(path)) # Wait for the workflow and task to be paused. + # NOTE: There is a race wheen execution gets in a desired state, but before the child + # tasks are written. To avoid that, we use longer sleep delay here. + # Better approach would be to try to retry a couple of times until expected num of + # tasks is reached (With some hard limit) before failing + eventlet.sleep(3) + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) From 14058b60274dfcd682725ffdca0d687e9089a951 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Wed, 31 Jul 2019 13:20:07 +0200 Subject: [PATCH 4/6] Allow wait_fixed and stop_max_delay to be provided on per method invocation basis. Update tests which have a race / rely on timing to use longer wait time to avoid failure. --- st2tests/integration/orquesta/base.py | 148 ++++++++++-------- .../orquesta/test_wiring_pause_and_resume.py | 76 +++++---- 2 files changed, 127 insertions(+), 97 deletions(-) diff --git a/st2tests/integration/orquesta/base.py b/st2tests/integration/orquesta/base.py index 697a700b1c..b7076fd4ce 100644 --- a/st2tests/integration/orquesta/base.py +++ b/st2tests/integration/orquesta/base.py @@ -32,6 +32,9 @@ action_constants.LIVEACTION_STATUS_RUNNING ] +DEFAULT_WAIT_FIXED = 3000 +DEFAULT_STOP_MAX_DELAY = 900000 + def retry_on_exceptions(exc): return isinstance(exc, AssertionError) @@ -82,86 +85,99 @@ def _execute_workflow(self, action, parameters=None, execute_async=True, return ex - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=500, stop_max_delay=900000) - def _wait_for_state(self, ex, states): - if isinstance(states, six.string_types): - states = [states] + def _wait_for_state(self, ex, states, wait_fixed=DEFAULT_WAIT_FIXED, + stop_max_delay=DEFAULT_STOP_MAX_DELAY): - for state in states: - if state not in action_constants.LIVEACTION_STATUSES: - raise ValueError('Status %s is not valid.' % state) + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) + def inner(ex, states): + if isinstance(states, six.string_types): + states = [states] - try: - ex = self.st2client.executions.get_by_id(ex.id) - self.assertIn(ex.status, states) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state "%s" and ' - 'does not match expected state(s). %s' % - (ex.status, ex.result) - ) - else: - raise + for state in states: + if state not in action_constants.LIVEACTION_STATUSES: + raise ValueError('Status %s is not valid.' % state) - return ex + try: + ex = self.st2client.executions.get_by_id(ex.id) + self.assertIn(ex.status, states) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state "%s" and ' + 'does not match expected state(s). %s' % + (ex.status, ex.result) + ) + else: + raise + + return ex + return inner(ex=ex, states=states) def _get_children(self, ex): return self.st2client.executions.query(parent=ex.id) - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=500, stop_max_delay=900000) - def _wait_for_task(self, ex, task, status=None, num_task_exs=1): - ex = self.st2client.executions.get_by_id(ex.id) - - task_exs = [ - task_ex for task_ex in self._get_children(ex) - if task_ex.context.get('orquesta', {}).get('task_name', '') == task - ] - - try: - self.assertEqual(len(task_exs), num_task_exs) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state and does not match expected number of ' - 'tasks. Expected: %s Actual: %s' % (str(num_task_exs), str(len(task_exs))) - ) - else: - raise + def _wait_for_task(self, ex, task, status=None, num_task_exs=1, + wait_fixed=DEFAULT_WAIT_FIXED, + stop_max_delay=DEFAULT_STOP_MAX_DELAY): + + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) + def inner(ex, task, status, num_task_exs): + ex = self.st2client.executions.get_by_id(ex.id) + + task_exs = [ + task_ex for task_ex in self._get_children(ex) + if task_ex.context.get('orquesta', {}).get('task_name', '') == task + ] - if status is not None: try: - self.assertTrue(all([task_ex.status == status for task_ex in task_exs])) + self.assertEqual(len(task_exs), num_task_exs) except: if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: raise Exception( - 'Execution is in completed state and not all tasks ' - 'match expected status "%s".' % status + 'Execution is in completed state and does not match expected number of ' + 'tasks. Expected: %s Actual: %s' % (str(num_task_exs), str(len(task_exs))) ) else: raise - return task_exs - - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=3000, stop_max_delay=900000) - def _wait_for_completion(self, ex): - ex = self._wait_for_state(ex, action_constants.LIVEACTION_COMPLETED_STATES) - - try: - self.assertTrue(hasattr(ex, 'result')) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state and does not ' - 'contain expected result.' - ) - else: - raise + if status is not None: + try: + self.assertTrue(all([task_ex.status == status for task_ex in task_exs])) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state and not all tasks ' + 'match expected status "%s".' % status + ) + else: + raise + + return task_exs + return inner(ex=ex, task=task, status=status, num_task_exs=num_task_exs) + + def _wait_for_completion(self, ex, wait_fixed=DEFAULT_WAIT_FIXED, + stop_max_delay=DEFAULT_STOP_MAX_DELAY): + + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) + def inner(ex): + ex = self._wait_for_state(ex, action_constants.LIVEACTION_COMPLETED_STATES) - return ex + try: + self.assertTrue(hasattr(ex, 'result')) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state and does not ' + 'contain expected result.' + ) + else: + raise + + return ex + return inner(ex=ex) diff --git a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py index a2bd3ec69f..08aaadbc1f 100644 --- a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py +++ b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py @@ -15,7 +15,6 @@ from __future__ import absolute_import import os -import eventlet from integration.orquesta import base @@ -162,38 +161,39 @@ def test_pause_and_resume_cascade_from_subworkflow(self): # Launch the workflow. The workflow will wait for the temp file to be deleted. params = {'tempfile': path} ex = self._execute_workflow('examples.orquesta-test-pause-subworkflow', params) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING) + + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING, wait_fixed=1500) + tk_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING, + wait_fixed=1500) # Pause the subworkflow before the temp file is deleted. The task will be # paused but workflow will still be running. tk_ac_ex = self.st2client.executions.pause(tk_exs[0].id) # Expecting the workflow is still running and task1 is pausing. - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING, + wait_fixed=1500) + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, + wait_fixed=1500) # Delete the temporary file. os.remove(path) self.assertFalse(os.path.exists(path)) - # Wait for the workflow and task to be paused. - # NOTE: There is a race wheen execution gets in a desired state, but before the child - # tasks are written. To avoid that, we use longer sleep delay here. - # Better approach would be to try to retry a couple of times until expected num of - # tasks is reached (With some hard limit) before failing - eventlet.sleep(3) - - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) # Resume the task. tk_ac_ex = self.st2client.executions.resume(tk_ac_ex.id) # Wait for completion. - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) - + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) + def test_pause_from_1_of_2_subworkflows_and_resume_subworkflow_when_workflow_paused(self): # Temp files are created during test setup. Ensure the temp files exist. path1 = self.temp_file_path_x @@ -305,8 +305,10 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): params = {'file1': path1, 'file2': path2} ex = self._execute_workflow('examples.orquesta-test-pause-subworkflows', params) ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING) - tk2_exs = self._wait_for_task(ex, 'task2', ac_const.LIVEACTION_STATUS_RUNNING) + tk1_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING, + wait_fixed=1500) + tk2_exs = self._wait_for_task(ex, 'task2', ac_const.LIVEACTION_STATUS_RUNNING, + wait_fixed=1500) # Pause the subworkflow before the temp file is deleted. The task will be # paused but workflow and the other subworkflow will still be running. @@ -314,8 +316,10 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Expecting the workflow is still running and task1 is pausing. ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) - tk2_ac_ex = self._wait_for_state(tk2_exs[0], ac_const.LIVEACTION_STATUS_RUNNING) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, + wait_fixed=1500) + tk2_ac_ex = self._wait_for_state(tk2_exs[0], ac_const.LIVEACTION_STATUS_RUNNING, + wait_fixed=1500) # Pause the other subworkflow before the temp file is deleted. The main # workflow will still running because pause is initiated downstream. @@ -323,8 +327,10 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Expecting workflow and subworkflows are pausing. ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, + wait_fixed=1500) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, + wait_fixed=1500) # Delete the temporary files for the subworkflows. os.remove(path1) @@ -334,25 +340,33 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Wait for subworkflows to pause. The main workflow will also # pause now because no other task is running. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) # Resume the subworkflow. tk1_ac_ex = self.st2client.executions.resume(tk1_ac_ex.id) # The subworkflow will succeed while the other subworkflow is still paused. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED, + wait_fixed=1500) # Resume the other subworkflow. tk2_ac_ex = self.st2client.executions.resume(tk2_ac_ex.id) # Wait for completion. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, + wait_fixed=1500) def test_pause_from_all_subworkflows_and_resume_from_parent_workflow(self): # Temp files are created during test setup. Ensure the temp files exist. From dbf7727908f77e760aca9871adceaa61308489c8 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Wed, 31 Jul 2019 17:27:39 +0200 Subject: [PATCH 5/6] Default it to a lowest value possible which still works for most of the tests. --- st2tests/integration/orquesta/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/st2tests/integration/orquesta/base.py b/st2tests/integration/orquesta/base.py index b7076fd4ce..373c5995d0 100644 --- a/st2tests/integration/orquesta/base.py +++ b/st2tests/integration/orquesta/base.py @@ -32,7 +32,7 @@ action_constants.LIVEACTION_STATUS_RUNNING ] -DEFAULT_WAIT_FIXED = 3000 +DEFAULT_WAIT_FIXED = 500 DEFAULT_STOP_MAX_DELAY = 900000 From 442641cbfa715a18d831c6a66e98b8955ff76188 Mon Sep 17 00:00:00 2001 From: W Chan Date: Thu, 1 Aug 2019 19:37:58 +0000 Subject: [PATCH 6/6] Revert the wait delay changes to orquesta itests Making the default wait_fixed to 500 is sufficient. The override to 1500 is unnecessary due to a bug in orquesta. --- .../orquesta_runner/in-requirements.txt | 2 +- .../runners/orquesta_runner/requirements.txt | 2 +- requirements.txt | 2 +- st2common/in-requirements.txt | 2 +- st2common/requirements.txt | 2 +- st2tests/integration/orquesta/base.py | 149 ++++++++---------- .../orquesta/test_wiring_pause_and_resume.py | 69 +++----- 7 files changed, 97 insertions(+), 131 deletions(-) diff --git a/contrib/runners/orquesta_runner/in-requirements.txt b/contrib/runners/orquesta_runner/in-requirements.txt index 1d23eba2d8..9ea1c723b6 100644 --- a/contrib/runners/orquesta_runner/in-requirements.txt +++ b/contrib/runners/orquesta_runner/in-requirements.txt @@ -1 +1 @@ -git+https://github.com/StackStorm/orquesta.git@224c1a589a6007eb0598a62ee99d674e7836d369#egg=orquesta +git+https://github.com/StackStorm/orquesta.git@e6ebbbeb2c661486067e659dc7552f0a986603a6#egg=orquesta diff --git a/contrib/runners/orquesta_runner/requirements.txt b/contrib/runners/orquesta_runner/requirements.txt index 882e3121e4..80f6ebf367 100644 --- a/contrib/runners/orquesta_runner/requirements.txt +++ b/contrib/runners/orquesta_runner/requirements.txt @@ -1,2 +1,2 @@ # Don't edit this file. It's generated automatically! -git+https://github.com/StackStorm/orquesta.git@224c1a589a6007eb0598a62ee99d674e7836d369#egg=orquesta +git+https://github.com/StackStorm/orquesta.git@e6ebbbeb2c661486067e659dc7552f0a986603a6#egg=orquesta diff --git a/requirements.txt b/requirements.txt index ee0714cdd8..fe7a5ba83c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ cryptography==2.6.1 eventlet==0.24.1 flex==6.14.0 git+https://github.com/Kami/logshipper.git@stackstorm_patched#egg=logshipper -git+https://github.com/StackStorm/orquesta.git@224c1a589a6007eb0598a62ee99d674e7836d369#egg=orquesta +git+https://github.com/StackStorm/orquesta.git@e6ebbbeb2c661486067e659dc7552f0a986603a6#egg=orquesta git+https://github.com/StackStorm/python-mistralclient.git#egg=python-mistralclient git+https://github.com/StackStorm/st2-auth-backend-flat-file.git@master#egg=st2-auth-backend-flat-file gitpython==2.1.11 diff --git a/st2common/in-requirements.txt b/st2common/in-requirements.txt index ed3b229110..eba34f6ea1 100644 --- a/st2common/in-requirements.txt +++ b/st2common/in-requirements.txt @@ -9,7 +9,7 @@ jsonschema kombu mongoengine networkx -git+https://github.com/StackStorm/orquesta.git@224c1a589a6007eb0598a62ee99d674e7836d369#egg=orquesta +git+https://github.com/StackStorm/orquesta.git@e6ebbbeb2c661486067e659dc7552f0a986603a6#egg=orquesta oslo.config paramiko pyyaml diff --git a/st2common/requirements.txt b/st2common/requirements.txt index 52c3ee54c3..2a69c90af4 100644 --- a/st2common/requirements.txt +++ b/st2common/requirements.txt @@ -4,7 +4,7 @@ apscheduler==3.6.0 cryptography==2.6.1 eventlet==0.24.1 flex==6.14.0 -git+https://github.com/StackStorm/orquesta.git@224c1a589a6007eb0598a62ee99d674e7836d369#egg=orquesta +git+https://github.com/StackStorm/orquesta.git@e6ebbbeb2c661486067e659dc7552f0a986603a6#egg=orquesta gitpython==2.1.11 greenlet==0.4.15 ipaddr diff --git a/st2tests/integration/orquesta/base.py b/st2tests/integration/orquesta/base.py index 373c5995d0..a1f16dbb81 100644 --- a/st2tests/integration/orquesta/base.py +++ b/st2tests/integration/orquesta/base.py @@ -85,99 +85,86 @@ def _execute_workflow(self, action, parameters=None, execute_async=True, return ex - def _wait_for_state(self, ex, states, wait_fixed=DEFAULT_WAIT_FIXED, - stop_max_delay=DEFAULT_STOP_MAX_DELAY): - - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) - def inner(ex, states): - if isinstance(states, six.string_types): - states = [states] - - for state in states: - if state not in action_constants.LIVEACTION_STATUSES: - raise ValueError('Status %s is not valid.' % state) - - try: - ex = self.st2client.executions.get_by_id(ex.id) - self.assertIn(ex.status, states) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state "%s" and ' - 'does not match expected state(s). %s' % - (ex.status, ex.result) - ) - else: - raise + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=DEFAULT_WAIT_FIXED, stop_max_delay=DEFAULT_STOP_MAX_DELAY) + def _wait_for_state(self, ex, states): + if isinstance(states, six.string_types): + states = [states] + + for state in states: + if state not in action_constants.LIVEACTION_STATUSES: + raise ValueError('Status %s is not valid.' % state) + + try: + ex = self.st2client.executions.get_by_id(ex.id) + self.assertIn(ex.status, states) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state "%s" and ' + 'does not match expected state(s). %s' % + (ex.status, ex.result) + ) + else: + raise - return ex - return inner(ex=ex, states=states) + return ex def _get_children(self, ex): return self.st2client.executions.query(parent=ex.id) - def _wait_for_task(self, ex, task, status=None, num_task_exs=1, - wait_fixed=DEFAULT_WAIT_FIXED, - stop_max_delay=DEFAULT_STOP_MAX_DELAY): - - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) - def inner(ex, task, status, num_task_exs): - ex = self.st2client.executions.get_by_id(ex.id) - - task_exs = [ - task_ex for task_ex in self._get_children(ex) - if task_ex.context.get('orquesta', {}).get('task_name', '') == task - ] + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=DEFAULT_WAIT_FIXED, stop_max_delay=DEFAULT_STOP_MAX_DELAY) + def _wait_for_task(self, ex, task, status=None, num_task_exs=1): + ex = self.st2client.executions.get_by_id(ex.id) + + task_exs = [ + task_ex for task_ex in self._get_children(ex) + if task_ex.context.get('orquesta', {}).get('task_name', '') == task + ] + + try: + self.assertEqual(len(task_exs), num_task_exs) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state and does not match expected number of ' + 'tasks. Expected: %s Actual: %s' % (str(num_task_exs), str(len(task_exs))) + ) + else: + raise + if status is not None: try: - self.assertEqual(len(task_exs), num_task_exs) + self.assertTrue(all([task_ex.status == status for task_ex in task_exs])) except: if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: raise Exception( - 'Execution is in completed state and does not match expected number of ' - 'tasks. Expected: %s Actual: %s' % (str(num_task_exs), str(len(task_exs))) + 'Execution is in completed state and not all tasks ' + 'match expected status "%s".' % status ) else: raise - if status is not None: - try: - self.assertTrue(all([task_ex.status == status for task_ex in task_exs])) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state and not all tasks ' - 'match expected status "%s".' % status - ) - else: - raise - - return task_exs - return inner(ex=ex, task=task, status=status, num_task_exs=num_task_exs) - - def _wait_for_completion(self, ex, wait_fixed=DEFAULT_WAIT_FIXED, - stop_max_delay=DEFAULT_STOP_MAX_DELAY): - - @retrying.retry( - retry_on_exception=retry_on_exceptions, - wait_fixed=wait_fixed, stop_max_delay=stop_max_delay) - def inner(ex): - ex = self._wait_for_state(ex, action_constants.LIVEACTION_COMPLETED_STATES) - - try: - self.assertTrue(hasattr(ex, 'result')) - except: - if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: - raise Exception( - 'Execution is in completed state and does not ' - 'contain expected result.' - ) - else: - raise + return task_exs + + @retrying.retry( + retry_on_exception=retry_on_exceptions, + wait_fixed=DEFAULT_WAIT_FIXED, stop_max_delay=DEFAULT_STOP_MAX_DELAY) + def _wait_for_completion(self, ex): + ex = self._wait_for_state(ex, action_constants.LIVEACTION_COMPLETED_STATES) + + try: + self.assertTrue(hasattr(ex, 'result')) + except: + if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: + raise Exception( + 'Execution is in completed state and does not ' + 'contain expected result.' + ) + else: + raise - return ex - return inner(ex=ex) + return ex diff --git a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py index 08aaadbc1f..6e8a3abe7e 100644 --- a/st2tests/integration/orquesta/test_wiring_pause_and_resume.py +++ b/st2tests/integration/orquesta/test_wiring_pause_and_resume.py @@ -161,39 +161,32 @@ def test_pause_and_resume_cascade_from_subworkflow(self): # Launch the workflow. The workflow will wait for the temp file to be deleted. params = {'tempfile': path} ex = self._execute_workflow('examples.orquesta-test-pause-subworkflow', params) - - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING, wait_fixed=1500) - tk_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING, - wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) + tk_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING) # Pause the subworkflow before the temp file is deleted. The task will be # paused but workflow will still be running. tk_ac_ex = self.st2client.executions.pause(tk_exs[0].id) # Expecting the workflow is still running and task1 is pausing. - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING, - wait_fixed=1500) - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, - wait_fixed=1500) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) # Delete the temporary file. os.remove(path) self.assertFalse(os.path.exists(path)) - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) + # Wait for the workflow and task to be paused. + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) # Resume the task. tk_ac_ex = self.st2client.executions.resume(tk_ac_ex.id) # Wait for completion. - tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) - + tk_ac_ex = self._wait_for_state(tk_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + def test_pause_from_1_of_2_subworkflows_and_resume_subworkflow_when_workflow_paused(self): # Temp files are created during test setup. Ensure the temp files exist. path1 = self.temp_file_path_x @@ -305,10 +298,8 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): params = {'file1': path1, 'file2': path2} ex = self._execute_workflow('examples.orquesta-test-pause-subworkflows', params) ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING, - wait_fixed=1500) - tk2_exs = self._wait_for_task(ex, 'task2', ac_const.LIVEACTION_STATUS_RUNNING, - wait_fixed=1500) + tk1_exs = self._wait_for_task(ex, 'task1', ac_const.LIVEACTION_STATUS_RUNNING) + tk2_exs = self._wait_for_task(ex, 'task2', ac_const.LIVEACTION_STATUS_RUNNING) # Pause the subworkflow before the temp file is deleted. The task will be # paused but workflow and the other subworkflow will still be running. @@ -316,10 +307,8 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Expecting the workflow is still running and task1 is pausing. ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, - wait_fixed=1500) - tk2_ac_ex = self._wait_for_state(tk2_exs[0], ac_const.LIVEACTION_STATUS_RUNNING, - wait_fixed=1500) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) + tk2_ac_ex = self._wait_for_state(tk2_exs[0], ac_const.LIVEACTION_STATUS_RUNNING) # Pause the other subworkflow before the temp file is deleted. The main # workflow will still running because pause is initiated downstream. @@ -327,10 +316,8 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Expecting workflow and subworkflows are pausing. ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_RUNNING) - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, - wait_fixed=1500) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING, - wait_fixed=1500) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSING) # Delete the temporary files for the subworkflows. os.remove(path1) @@ -340,33 +327,25 @@ def test_pause_from_all_subworkflows_and_resume_from_subworkflows(self): # Wait for subworkflows to pause. The main workflow will also # pause now because no other task is running. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) # Resume the subworkflow. tk1_ac_ex = self.st2client.executions.resume(tk1_ac_ex.id) # The subworkflow will succeed while the other subworkflow is still paused. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED, - wait_fixed=1500) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_PAUSED) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_PAUSED) # Resume the other subworkflow. tk2_ac_ex = self.st2client.executions.resume(tk2_ac_ex.id) # Wait for completion. - tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) - tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) - ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED, - wait_fixed=1500) + tk1_ac_ex = self._wait_for_state(tk1_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + tk2_ac_ex = self._wait_for_state(tk2_ac_ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) + ex = self._wait_for_state(ex, ac_const.LIVEACTION_STATUS_SUCCEEDED) def test_pause_from_all_subworkflows_and_resume_from_parent_workflow(self): # Temp files are created during test setup. Ensure the temp files exist.