From 1d7edbe395d8ea1cb7684122b38d1b07d985ef30 Mon Sep 17 00:00:00 2001 From: W Chan Date: Mon, 11 Feb 2019 22:50:26 +0000 Subject: [PATCH 01/11] Refactor scheduler process to exit properly Add retries in the scheduler handler to temporarily handle DB connection failures. Refactor how threads exit for the process to return proper code. --- st2actions/st2actions/cmd/scheduler.py | 11 +++++- st2actions/st2actions/scheduler/config.py | 7 +++- st2actions/st2actions/scheduler/handler.py | 45 +++++++++++++++++----- st2common/st2common/util/service.py | 36 +++++++++++++++++ 4 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 st2common/st2common/util/service.py diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index 1ae0096084..8cc48f8cee 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -51,7 +51,9 @@ def _run_queuer(): try: handler.start() entrypoint.start() - entrypoint.wait() + + # Wait on handler first since entrypoint is more durable. + handler.wait() or entrypoint.wait() except (KeyboardInterrupt, SystemExit): LOG.info('(PID=%s) Scheduler stopped.', os.getpid()) @@ -68,6 +70,13 @@ def _run_queuer(): return 1 except: LOG.exception('(PID=%s) Scheduler unexpectedly stopped.', os.getpid()) + + try: + handler.shutdown() + entrypoint.shutdown() + except: + pass + return 1 return 0 diff --git a/st2actions/st2actions/scheduler/config.py b/st2actions/st2actions/scheduler/config.py index ed7d7477be..552042bc0a 100644 --- a/st2actions/st2actions/scheduler/config.py +++ b/st2actions/st2actions/scheduler/config.py @@ -56,7 +56,12 @@ def _register_service_opts(): 'gc_interval', default=10, help='How often (in seconds) to look for zombie execution requests before rescheduling ' 'them.'), - + cfg.IntOpt( + 'retry_max_attempt', default=10, + help='The maximum number of attempts that the scheduler retries on error.'), + cfg.IntOpt( + 'retry_wait_msec', default=3000, + help='The number of milliseconds to wait in between retries.') ] cfg.CONF.register_opts(scheduler_opts, group='scheduler') diff --git a/st2actions/st2actions/scheduler/handler.py b/st2actions/st2actions/scheduler/handler.py index e93adf6ec9..74027b88ed 100644 --- a/st2actions/st2actions/scheduler/handler.py +++ b/st2actions/st2actions/scheduler/handler.py @@ -16,10 +16,12 @@ from __future__ import absolute_import import eventlet +import retrying from oslo_config import cfg from st2common import log as logging from st2common.util import date +from st2common.util import service as service_utils from st2common.constants import action as action_constants from st2common.constants import policy as policy_constants from st2common.exceptions.db import StackStormDBObjectNotFoundError @@ -60,25 +62,37 @@ def __init__(self): self._shutdown = False self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size) self._coordinator = coordination_service.get_coordinator() + self._main_thread = None + self._cleanup_thread = None def run(self): - LOG.debug('Entering scheduler loop') + LOG.debug('Starting scheduler handler...') while not self._shutdown: eventlet.greenthread.sleep(cfg.CONF.scheduler.sleep_interval) + self.process() - execution_queue_item_db = self._get_next_execution() + @retrying.retry( + retry_on_exception=service_utils.retry_on_exceptions, + stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt, + wait_fixed=cfg.CONF.scheduler.retry_wait_msec) + def process(self): + execution_queue_item_db = self._get_next_execution() - if execution_queue_item_db: - self._pool.spawn(self._handle_execution, execution_queue_item_db) + if execution_queue_item_db: + self._pool.spawn(self._handle_execution, execution_queue_item_db) def cleanup(self): - LOG.debug('Starting scheduler garbage collection') + LOG.debug('Starting scheduler garbage collection...') while not self._shutdown: eventlet.greenthread.sleep(cfg.CONF.scheduler.gc_interval) self._handle_garbage_collection() + @retrying.retry( + retry_on_exception=service_utils.retry_on_exceptions, + stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt, + wait_fixed=cfg.CONF.scheduler.retry_wait_msec) def _handle_garbage_collection(self): """ Periodically look for executions which have "handling" set to "True" and haven't been @@ -328,11 +342,24 @@ def _update_to_scheduled(liveaction_db, execution_queue_item_db): def start(self): self._shutdown = False - eventlet.spawn(self.run) - eventlet.spawn(self.cleanup) + # Spawn the worker threads. + self._main_thread = eventlet.spawn(self.run) + self._cleanup_thread = eventlet.spawn(self.cleanup) - def shutdown(self): - self._shutdown = True + # Link the threads to the shutdown function. If either of the threads exited with error, + # then initiate shutdown which will allow the waits below to throw exception to the + # main process. + self._main_thread.link(self.shutdown) + self._cleanup_thread.link(self.shutdown) + + def shutdown(self, *args, **kwargs): + if not self._shutdown: + self._shutdown = True + + def wait(self): + # Wait for the worker threads to complete. If there is an exception thrown in the thread, + # then the exception will be propagated to the main process for a proper return code. + self._main_thread.wait() or self._cleanup_thread.wait() def get_handler(): diff --git a/st2common/st2common/util/service.py b/st2common/st2common/util/service.py new file mode 100644 index 0000000000..9e9a7df92d --- /dev/null +++ b/st2common/st2common/util/service.py @@ -0,0 +1,36 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import pymongo + +from st2common import log as logging + + +LOG = logging.getLogger(__name__) + + +def retry_on_exceptions(exc): + LOG.warning('Evaluating retry on exception %s. %s', type(exc), str(exc)) + + is_mongo_connection_error = isinstance(exc, pymongo.errors.ConnectionFailure) + + retrying = is_mongo_connection_error + + if retrying: + LOG.warning('Retrying on exception %s.', type(exc)) + + return retrying From 51cb7c275b33e5a11a6c6460ebef4893286fa76f Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 12 Feb 2019 10:20:19 +0100 Subject: [PATCH 02/11] Add missing license header. --- st2actions/st2actions/cmd/scheduler.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index 8cc48f8cee..fad2aa22b4 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -1,6 +1,21 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # Monkey patching should be done as early as possible. # See http://eventlet.net/doc/patching.html#monkeypatching-the-standard-library from __future__ import absolute_import + from st2common.util.monkey_patch import monkey_patch monkey_patch() From 092e066499e9e03f424087388691158716b224c3 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 12 Feb 2019 10:26:16 +0100 Subject: [PATCH 03/11] Fix function name. --- st2actions/st2actions/cmd/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index fad2aa22b4..9e548d6ebb 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -51,7 +51,7 @@ def _setup(): _setup_sigterm_handler() -def _run_queuer(): +def _run_scheduler(): LOG.info('(PID=%s) Scheduler started.', os.getpid()) # Lazy load these so that decorator metrics are in place @@ -104,7 +104,7 @@ def _teardown(): def main(): try: _setup() - return _run_queuer() + return _run_scheduler() except SystemExit as exit_code: sys.exit(exit_code) except: From 442c57e1f23a42b48dd7b7550731c4d9beac7df7 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 12 Feb 2019 10:57:15 +0100 Subject: [PATCH 04/11] Add a test case for scheduler correctly exiting on handler and entrypoint exceptions. NOTE: Tests currently fail because issue hasn't been fully fixed yet. --- .../tests/unit/test_scheduler_entrypoint.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 st2actions/tests/unit/test_scheduler_entrypoint.py diff --git a/st2actions/tests/unit/test_scheduler_entrypoint.py b/st2actions/tests/unit/test_scheduler_entrypoint.py new file mode 100644 index 0000000000..300e37c224 --- /dev/null +++ b/st2actions/tests/unit/test_scheduler_entrypoint.py @@ -0,0 +1,119 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import mock + +from st2actions.cmd.scheduler import _run_scheduler +from st2actions.scheduler.handler import ActionExecutionSchedulingQueueHandler +from st2actions.scheduler.entrypoint import SchedulerEntrypoint + +from st2tests.base import CleanDbTestCase + +__all__ = [ + 'SchedulerServiceEntryPointTestCase' +] + +def mock_handler_run(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('handler run exception') + + +def mock_handler_start_wait(self): + """ + This method emulates exception being throw in async nature in cls.process() + method. + """ + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + + # Mock call to process() to emulate .wait() and not .start() throwing + eventlet.spawn(self.process, mock.Mock()) + + +def mock_handler_process(self, request): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('handler process exception') + + +def mock_entrypoint_start(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('entrypoint start exception') + + +def mock_entrypoint_start_wait(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + + # Mock call to process() to emulate .wait() and not .start() throwing + eventlet.spawn(self.process, mock.Mock()) + + +def mock_entrypoint_process(self, request): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('entrypoint process exception') + + +class SchedulerServiceEntryPointTestCase(CleanDbTestCase): + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'run', mock_handler_run) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_handler_run(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'start', mock_handler_start_wait) + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'process', mock_handler_process) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_handler_process(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + + + @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + + @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start_wait) + @mock.patch.object(SchedulerEntrypoint, 'process', mock_entrypoint_process) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_entrypoint_process(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) From 14370048116bed04aaa6adaa20f03f1b4356fb38 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 12 Feb 2019 11:08:29 +0100 Subject: [PATCH 05/11] Make binaries executable. --- st2actions/bin/runners.sh | 0 st2actions/bin/st2scheduler | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 st2actions/bin/runners.sh mode change 100644 => 100755 st2actions/bin/st2scheduler diff --git a/st2actions/bin/runners.sh b/st2actions/bin/runners.sh old mode 100644 new mode 100755 diff --git a/st2actions/bin/st2scheduler b/st2actions/bin/st2scheduler old mode 100644 new mode 100755 From 6371ef1a43fd754d95e9d189b20fd4581cfea2e3 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 12 Feb 2019 12:10:43 +0100 Subject: [PATCH 06/11] Update tests. --- .../tests/unit/test_scheduler_entrypoint.py | 58 +------------------ 1 file changed, 1 insertion(+), 57 deletions(-) diff --git a/st2actions/tests/unit/test_scheduler_entrypoint.py b/st2actions/tests/unit/test_scheduler_entrypoint.py index 300e37c224..1528f8d284 100644 --- a/st2actions/tests/unit/test_scheduler_entrypoint.py +++ b/st2actions/tests/unit/test_scheduler_entrypoint.py @@ -26,50 +26,19 @@ 'SchedulerServiceEntryPointTestCase' ] + def mock_handler_run(self): # NOTE: We use eventlet.sleep to emulate async nature of this process eventlet.sleep(0.2) raise Exception('handler run exception') -def mock_handler_start_wait(self): - """ - This method emulates exception being throw in async nature in cls.process() - method. - """ - # NOTE: We use eventlet.sleep to emulate async nature of this process - eventlet.sleep(0.2) - - # Mock call to process() to emulate .wait() and not .start() throwing - eventlet.spawn(self.process, mock.Mock()) - - -def mock_handler_process(self, request): - # NOTE: We use eventlet.sleep to emulate async nature of this process - eventlet.sleep(0.2) - raise Exception('handler process exception') - - def mock_entrypoint_start(self): # NOTE: We use eventlet.sleep to emulate async nature of this process eventlet.sleep(0.2) raise Exception('entrypoint start exception') -def mock_entrypoint_start_wait(self): - # NOTE: We use eventlet.sleep to emulate async nature of this process - eventlet.sleep(0.2) - - # Mock call to process() to emulate .wait() and not .start() throwing - eventlet.spawn(self.process, mock.Mock()) - - -def mock_entrypoint_process(self, request): - # NOTE: We use eventlet.sleep to emulate async nature of this process - eventlet.sleep(0.2) - raise Exception('entrypoint process exception') - - class SchedulerServiceEntryPointTestCase(CleanDbTestCase): @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'run', mock_handler_run) @mock.patch('st2actions.cmd.scheduler.LOG') @@ -82,19 +51,6 @@ def test_service_exits_correctly_on_fatal_exception_in_handler_run(self, mock_lo mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) - @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'start', mock_handler_start_wait) - @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'process', mock_handler_process) - @mock.patch('st2actions.cmd.scheduler.LOG') - def test_service_exits_correctly_on_fatal_exception_in_handler_process(self, mock_log): - run_thread = eventlet.spawn(_run_scheduler) - result = run_thread.wait() - - self.assertEqual(result, 1) - - mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] - self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) - - @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start) @mock.patch('st2actions.cmd.scheduler.LOG') def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mock_log): @@ -105,15 +61,3 @@ def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mo mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) - - @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start_wait) - @mock.patch.object(SchedulerEntrypoint, 'process', mock_entrypoint_process) - @mock.patch('st2actions.cmd.scheduler.LOG') - def test_service_exits_correctly_on_fatal_exception_in_entrypoint_process(self, mock_log): - run_thread = eventlet.spawn(_run_scheduler) - result = run_thread.wait() - - self.assertEqual(result, 1) - - mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] - self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) From 359ae246da0bf07e214bf43d716fd3f9b74efefa Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 12 Feb 2019 20:01:20 +0000 Subject: [PATCH 07/11] Regenerated the sample st2 config Regenerated the sample st2 config with the scheduler retry configuration options. --- conf/st2.conf.sample | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index d5215206e1..7f21a3988e 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -245,14 +245,18 @@ thread_pool_size = 10 logging = /etc/st2/logging.rulesengine.conf [scheduler] -# How long (in seconds) to sleep between each action scheduler main loop run interval. -sleep_interval = 0.1 -# How often (in seconds) to look for zombie execution requests before rescheduling them. -gc_interval = 10 +# The maximum number of attempts that the scheduler retries on error. +retry_max_attempt = 10 # Location of the logging configuration file. logging = /etc/st2/logging.scheduler.conf +# How long (in seconds) to sleep between each action scheduler main loop run interval. +sleep_interval = 0.1 # The size of the pool used by the scheduler for scheduling executions. pool_size = 10 +# The number of milliseconds to wait in between retries. +retry_wait_msec = 3000 +# How often (in seconds) to look for zombie execution requests before rescheduling them. +gc_interval = 10 [schema] # Version of JSON schema to use. From 5aa3f6e6f81bcfee73f2169856957c03e042795e Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 12 Feb 2019 20:16:35 +0000 Subject: [PATCH 08/11] Add unit test to cover the handler cleanup Add a unit test to cover failure in the handler cleanup. This should signal the run method to also pause and exit the scheduler handler process. --- .../tests/unit/test_scheduler_entrypoint.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/st2actions/tests/unit/test_scheduler_entrypoint.py b/st2actions/tests/unit/test_scheduler_entrypoint.py index 1528f8d284..9e0282a610 100644 --- a/st2actions/tests/unit/test_scheduler_entrypoint.py +++ b/st2actions/tests/unit/test_scheduler_entrypoint.py @@ -33,6 +33,12 @@ def mock_handler_run(self): raise Exception('handler run exception') +def mock_handler_cleanup(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('handler clean exception') + + def mock_entrypoint_start(self): # NOTE: We use eventlet.sleep to emulate async nature of this process eventlet.sleep(0.2) @@ -51,6 +57,17 @@ def test_service_exits_correctly_on_fatal_exception_in_handler_run(self, mock_lo mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'cleanup', mock_handler_cleanup) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_handler_cleanup(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start) @mock.patch('st2actions.cmd.scheduler.LOG') def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mock_log): From 86221e060b437409bd90c6c8c859940f7797f502 Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 12 Feb 2019 22:12:19 +0000 Subject: [PATCH 09/11] Add unit tests to cover the retries in scheduler handler Add unit tests to cover the retries in the run and cleanup in the scheduler handler. --- st2actions/tests/unit/test_scheduler_retry.py | 120 ++++++++++++++++++ st2tests/st2tests/config.py | 6 + 2 files changed, 126 insertions(+) create mode 100644 st2actions/tests/unit/test_scheduler_retry.py diff --git a/st2actions/tests/unit/test_scheduler_retry.py b/st2actions/tests/unit/test_scheduler_retry.py new file mode 100644 index 0000000000..51756e2ce0 --- /dev/null +++ b/st2actions/tests/unit/test_scheduler_retry.py @@ -0,0 +1,120 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import mock +import pymongo +import uuid + +from st2tests import config as test_config +test_config.parse_args() + +from st2actions.scheduler import handler +from st2common.models.db import execution_queue as ex_q_db +from st2common.persistence import execution_queue as ex_q_db_access +from st2tests.base import CleanDbTestCase + + +__all__ = [ + 'SchedulerHandlerRetryTestCase' +] + + +MOCK_QUEUE_ITEM = ex_q_db.ActionExecutionSchedulingQueueItemDB(liveaction_id=uuid.uuid4().hex) + + +class SchedulerHandlerRetryTestCase(CleanDbTestCase): + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure(), MOCK_QUEUE_ITEM])) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retry_connection_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + scheduling_queue_handler.process() + + # Make sure retry occurs and that _handle_execution in process is called. + calls = [mock.call(scheduling_queue_handler._handle_execution, MOCK_QUEUE_ITEM)] + eventlet.GreenPool.spawn.assert_has_calls(calls) + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure()] * 3)) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retries_exhausted(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + self.assertRaises(pymongo.errors.ConnectionFailure, scheduling_queue_handler.process) + self.assertEqual(eventlet.GreenPool.spawn.call_count, 0) + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=KeyError())) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retry_unexpected_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + self.assertRaises(KeyError, scheduling_queue_handler.process) + self.assertEqual(eventlet.GreenPool.spawn.call_count, 0) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure(), [MOCK_QUEUE_ITEM]])) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_retry_connection_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + scheduling_queue_handler._handle_garbage_collection() + + # Make sure retry occurs and that _handle_execution in process is called. + calls = [mock.call(MOCK_QUEUE_ITEM, publish=False)] + ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.assert_has_calls(calls) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure()] * 3)) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_retries_exhausted(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + + self.assertRaises( + pymongo.errors.ConnectionFailure, + scheduling_queue_handler._handle_garbage_collection + ) + + self.assertEqual(ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.call_count, 0) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=KeyError())) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_unexpected_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + + self.assertRaises( + KeyError, + scheduling_queue_handler._handle_garbage_collection + ) + + self.assertEqual(ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.call_count, 0) diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index f96aa774f3..84a29dc06f 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -251,6 +251,12 @@ def _register_scheduler_opts(): cfg.FloatOpt( 'gc_interval', default=5, help='How often to look for zombie executions before rescheduling them (in ms).'), + cfg.IntOpt( + 'retry_max_attempt', default=3, + help='The maximum number of attempts that the scheduler retries on error.'), + cfg.IntOpt( + 'retry_wait_msec', default=100, + help='The number of milliseconds to wait in between retries.') ] _register_opts(scheduler_opts, group='scheduler') From 3898b26bacd896a403909a90fdaf75229f6c9144 Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 13 Feb 2019 00:54:08 +0000 Subject: [PATCH 10/11] Fix scheduler test configs in unit tests Add or move the parsing of test configs to the top of affected test modules and make sure the scheduler default config options do not conflict with test configs. --- .../tests/unit/test_actionchain_cancel.py | 3 +++ .../tests/unit/test_actionchain_pause_resume.py | 3 +++ st2actions/st2actions/scheduler/config.py | 9 ++++++++- st2actions/tests/unit/test_scheduler.py | 6 +++--- st2actions/tests/unit/test_scheduler_entrypoint.py | 3 +++ 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py index 1e663f3dab..e53a1673ad 100644 --- a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py +++ b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py @@ -19,6 +19,9 @@ import os import tempfile +from st2tests import config as test_config +test_config.parse_args() + from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar diff --git a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py index 189d74ec34..0b25930a2f 100644 --- a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py +++ b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py @@ -19,6 +19,9 @@ import os import tempfile +from st2tests import config as test_config +test_config.parse_args() + from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar diff --git a/st2actions/st2actions/scheduler/config.py b/st2actions/st2actions/scheduler/config.py index 552042bc0a..27edfd6634 100644 --- a/st2actions/st2actions/scheduler/config.py +++ b/st2actions/st2actions/scheduler/config.py @@ -19,6 +19,10 @@ from st2common import config as common_config from st2common.constants import system as sys_constants +from st2common import log as logging + + +LOG = logging.getLogger(__name__) def parse_args(args=None): @@ -67,4 +71,7 @@ def _register_service_opts(): cfg.CONF.register_opts(scheduler_opts, group='scheduler') -register_opts() +try: + register_opts() +except cfg.DuplicateOptError: + LOG.exception('The scheduler configuration options are already parsed and loaded.') diff --git a/st2actions/tests/unit/test_scheduler.py b/st2actions/tests/unit/test_scheduler.py index 96620ea5d8..41c0b437f3 100644 --- a/st2actions/tests/unit/test_scheduler.py +++ b/st2actions/tests/unit/test_scheduler.py @@ -19,6 +19,9 @@ import mock import eventlet +from st2tests import config as test_config +test_config.parse_args() + import st2common from st2tests import ExecutionDbTestCase from st2tests.fixturesloader import FixturesLoader @@ -39,9 +42,6 @@ from st2common.services import executions as execution_service from st2common.exceptions import db as db_exc -from st2tests import config as test_config -test_config.parse_args() - LIVE_ACTION = { 'parameters': { diff --git a/st2actions/tests/unit/test_scheduler_entrypoint.py b/st2actions/tests/unit/test_scheduler_entrypoint.py index 9e0282a610..65f6d2d8ed 100644 --- a/st2actions/tests/unit/test_scheduler_entrypoint.py +++ b/st2actions/tests/unit/test_scheduler_entrypoint.py @@ -16,6 +16,9 @@ import eventlet import mock +from st2tests import config as test_config +test_config.parse_args() + from st2actions.cmd.scheduler import _run_scheduler from st2actions.scheduler.handler import ActionExecutionSchedulingQueueHandler from st2actions.scheduler.entrypoint import SchedulerEntrypoint From 03d77754229e33dae4463814e20549691c4ffe86 Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 13 Feb 2019 17:20:07 +0000 Subject: [PATCH 11/11] Include the scheduler retry and exit code fix in changelog --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7fb1e76a65..561e0f79f0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -43,6 +43,8 @@ Changed * Moved the lock from concurrency policies into the scheduler to fix a race condition when there are multiple scheduler instances scheduling execution for action with concurrency policies. #4481 (bug fix) +* Add retries to scheduler to handle temporary hiccup in DB connection. Refactor scheduler + service to return proper exit code when there is a failure. #4539 (bug fix) Fixed ~~~~~