diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7fb1e76a65..561e0f79f0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -43,6 +43,8 @@ Changed * Moved the lock from concurrency policies into the scheduler to fix a race condition when there are multiple scheduler instances scheduling execution for action with concurrency policies. #4481 (bug fix) +* Add retries to scheduler to handle temporary hiccup in DB connection. Refactor scheduler + service to return proper exit code when there is a failure. #4539 (bug fix) Fixed ~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 21db1278af..22ad407a37 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -257,14 +257,18 @@ thread_pool_size = 10 logging = /etc/st2/logging.rulesengine.conf [scheduler] -# How long (in seconds) to sleep between each action scheduler main loop run interval. -sleep_interval = 0.1 -# How often (in seconds) to look for zombie execution requests before rescheduling them. -gc_interval = 10 +# The maximum number of attempts that the scheduler retries on error. +retry_max_attempt = 10 # Location of the logging configuration file. logging = /etc/st2/logging.scheduler.conf +# How long (in seconds) to sleep between each action scheduler main loop run interval. +sleep_interval = 0.1 # The size of the pool used by the scheduler for scheduling executions. pool_size = 10 +# The number of milliseconds to wait in between retries. +retry_wait_msec = 3000 +# How often (in seconds) to look for zombie execution requests before rescheduling them. +gc_interval = 10 [schema] # Version of JSON schema to use. diff --git a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py index 1e663f3dab..e53a1673ad 100644 --- a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py +++ b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_cancel.py @@ -19,6 +19,9 @@ import os import tempfile +from st2tests import config as test_config +test_config.parse_args() + from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar diff --git a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py index 189d74ec34..0b25930a2f 100644 --- a/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py +++ b/contrib/runners/action_chain_runner/tests/unit/test_actionchain_pause_resume.py @@ -19,6 +19,9 @@ import os import tempfile +from st2tests import config as test_config +test_config.parse_args() + from st2common.bootstrap import actionsregistrar from st2common.bootstrap import runnersregistrar diff --git a/st2actions/bin/runners.sh b/st2actions/bin/runners.sh old mode 100644 new mode 100755 diff --git a/st2actions/bin/st2scheduler b/st2actions/bin/st2scheduler old mode 100644 new mode 100755 diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index 1ae0096084..9e548d6ebb 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -1,6 +1,21 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # Monkey patching should be done as early as possible. # See http://eventlet.net/doc/patching.html#monkeypatching-the-standard-library from __future__ import absolute_import + from st2common.util.monkey_patch import monkey_patch monkey_patch() @@ -36,7 +51,7 @@ def _setup(): _setup_sigterm_handler() -def _run_queuer(): +def _run_scheduler(): LOG.info('(PID=%s) Scheduler started.', os.getpid()) # Lazy load these so that decorator metrics are in place @@ -51,7 +66,9 @@ def _run_queuer(): try: handler.start() entrypoint.start() - entrypoint.wait() + + # Wait on handler first since entrypoint is more durable. + handler.wait() or entrypoint.wait() except (KeyboardInterrupt, SystemExit): LOG.info('(PID=%s) Scheduler stopped.', os.getpid()) @@ -68,6 +85,13 @@ def _run_queuer(): return 1 except: LOG.exception('(PID=%s) Scheduler unexpectedly stopped.', os.getpid()) + + try: + handler.shutdown() + entrypoint.shutdown() + except: + pass + return 1 return 0 @@ -80,7 +104,7 @@ def _teardown(): def main(): try: _setup() - return _run_queuer() + return _run_scheduler() except SystemExit as exit_code: sys.exit(exit_code) except: diff --git a/st2actions/st2actions/scheduler/config.py b/st2actions/st2actions/scheduler/config.py index ed7d7477be..27edfd6634 100644 --- a/st2actions/st2actions/scheduler/config.py +++ b/st2actions/st2actions/scheduler/config.py @@ -19,6 +19,10 @@ from st2common import config as common_config from st2common.constants import system as sys_constants +from st2common import log as logging + + +LOG = logging.getLogger(__name__) def parse_args(args=None): @@ -56,10 +60,18 @@ def _register_service_opts(): 'gc_interval', default=10, help='How often (in seconds) to look for zombie execution requests before rescheduling ' 'them.'), - + cfg.IntOpt( + 'retry_max_attempt', default=10, + help='The maximum number of attempts that the scheduler retries on error.'), + cfg.IntOpt( + 'retry_wait_msec', default=3000, + help='The number of milliseconds to wait in between retries.') ] cfg.CONF.register_opts(scheduler_opts, group='scheduler') -register_opts() +try: + register_opts() +except cfg.DuplicateOptError: + LOG.exception('The scheduler configuration options are already parsed and loaded.') diff --git a/st2actions/st2actions/scheduler/handler.py b/st2actions/st2actions/scheduler/handler.py index e93adf6ec9..74027b88ed 100644 --- a/st2actions/st2actions/scheduler/handler.py +++ b/st2actions/st2actions/scheduler/handler.py @@ -16,10 +16,12 @@ from __future__ import absolute_import import eventlet +import retrying from oslo_config import cfg from st2common import log as logging from st2common.util import date +from st2common.util import service as service_utils from st2common.constants import action as action_constants from st2common.constants import policy as policy_constants from st2common.exceptions.db import StackStormDBObjectNotFoundError @@ -60,25 +62,37 @@ def __init__(self): self._shutdown = False self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size) self._coordinator = coordination_service.get_coordinator() + self._main_thread = None + self._cleanup_thread = None def run(self): - LOG.debug('Entering scheduler loop') + LOG.debug('Starting scheduler handler...') while not self._shutdown: eventlet.greenthread.sleep(cfg.CONF.scheduler.sleep_interval) + self.process() - execution_queue_item_db = self._get_next_execution() + @retrying.retry( + retry_on_exception=service_utils.retry_on_exceptions, + stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt, + wait_fixed=cfg.CONF.scheduler.retry_wait_msec) + def process(self): + execution_queue_item_db = self._get_next_execution() - if execution_queue_item_db: - self._pool.spawn(self._handle_execution, execution_queue_item_db) + if execution_queue_item_db: + self._pool.spawn(self._handle_execution, execution_queue_item_db) def cleanup(self): - LOG.debug('Starting scheduler garbage collection') + LOG.debug('Starting scheduler garbage collection...') while not self._shutdown: eventlet.greenthread.sleep(cfg.CONF.scheduler.gc_interval) self._handle_garbage_collection() + @retrying.retry( + retry_on_exception=service_utils.retry_on_exceptions, + stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt, + wait_fixed=cfg.CONF.scheduler.retry_wait_msec) def _handle_garbage_collection(self): """ Periodically look for executions which have "handling" set to "True" and haven't been @@ -328,11 +342,24 @@ def _update_to_scheduled(liveaction_db, execution_queue_item_db): def start(self): self._shutdown = False - eventlet.spawn(self.run) - eventlet.spawn(self.cleanup) + # Spawn the worker threads. + self._main_thread = eventlet.spawn(self.run) + self._cleanup_thread = eventlet.spawn(self.cleanup) - def shutdown(self): - self._shutdown = True + # Link the threads to the shutdown function. If either of the threads exited with error, + # then initiate shutdown which will allow the waits below to throw exception to the + # main process. + self._main_thread.link(self.shutdown) + self._cleanup_thread.link(self.shutdown) + + def shutdown(self, *args, **kwargs): + if not self._shutdown: + self._shutdown = True + + def wait(self): + # Wait for the worker threads to complete. If there is an exception thrown in the thread, + # then the exception will be propagated to the main process for a proper return code. + self._main_thread.wait() or self._cleanup_thread.wait() def get_handler(): diff --git a/st2actions/tests/unit/test_scheduler.py b/st2actions/tests/unit/test_scheduler.py index 96620ea5d8..41c0b437f3 100644 --- a/st2actions/tests/unit/test_scheduler.py +++ b/st2actions/tests/unit/test_scheduler.py @@ -19,6 +19,9 @@ import mock import eventlet +from st2tests import config as test_config +test_config.parse_args() + import st2common from st2tests import ExecutionDbTestCase from st2tests.fixturesloader import FixturesLoader @@ -39,9 +42,6 @@ from st2common.services import executions as execution_service from st2common.exceptions import db as db_exc -from st2tests import config as test_config -test_config.parse_args() - LIVE_ACTION = { 'parameters': { diff --git a/st2actions/tests/unit/test_scheduler_entrypoint.py b/st2actions/tests/unit/test_scheduler_entrypoint.py new file mode 100644 index 0000000000..65f6d2d8ed --- /dev/null +++ b/st2actions/tests/unit/test_scheduler_entrypoint.py @@ -0,0 +1,83 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import mock + +from st2tests import config as test_config +test_config.parse_args() + +from st2actions.cmd.scheduler import _run_scheduler +from st2actions.scheduler.handler import ActionExecutionSchedulingQueueHandler +from st2actions.scheduler.entrypoint import SchedulerEntrypoint + +from st2tests.base import CleanDbTestCase + +__all__ = [ + 'SchedulerServiceEntryPointTestCase' +] + + +def mock_handler_run(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('handler run exception') + + +def mock_handler_cleanup(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('handler clean exception') + + +def mock_entrypoint_start(self): + # NOTE: We use eventlet.sleep to emulate async nature of this process + eventlet.sleep(0.2) + raise Exception('entrypoint start exception') + + +class SchedulerServiceEntryPointTestCase(CleanDbTestCase): + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'run', mock_handler_run) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_handler_run(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + + @mock.patch.object(ActionExecutionSchedulingQueueHandler, 'cleanup', mock_handler_cleanup) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_handler_cleanup(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) + + @mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start) + @mock.patch('st2actions.cmd.scheduler.LOG') + def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mock_log): + run_thread = eventlet.spawn(_run_scheduler) + result = run_thread.wait() + + self.assertEqual(result, 1) + + mock_log_exception_call = mock_log.exception.call_args_list[0][0][0] + self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call) diff --git a/st2actions/tests/unit/test_scheduler_retry.py b/st2actions/tests/unit/test_scheduler_retry.py new file mode 100644 index 0000000000..51756e2ce0 --- /dev/null +++ b/st2actions/tests/unit/test_scheduler_retry.py @@ -0,0 +1,120 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +import mock +import pymongo +import uuid + +from st2tests import config as test_config +test_config.parse_args() + +from st2actions.scheduler import handler +from st2common.models.db import execution_queue as ex_q_db +from st2common.persistence import execution_queue as ex_q_db_access +from st2tests.base import CleanDbTestCase + + +__all__ = [ + 'SchedulerHandlerRetryTestCase' +] + + +MOCK_QUEUE_ITEM = ex_q_db.ActionExecutionSchedulingQueueItemDB(liveaction_id=uuid.uuid4().hex) + + +class SchedulerHandlerRetryTestCase(CleanDbTestCase): + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure(), MOCK_QUEUE_ITEM])) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retry_connection_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + scheduling_queue_handler.process() + + # Make sure retry occurs and that _handle_execution in process is called. + calls = [mock.call(scheduling_queue_handler._handle_execution, MOCK_QUEUE_ITEM)] + eventlet.GreenPool.spawn.assert_has_calls(calls) + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure()] * 3)) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retries_exhausted(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + self.assertRaises(pymongo.errors.ConnectionFailure, scheduling_queue_handler.process) + self.assertEqual(eventlet.GreenPool.spawn.call_count, 0) + + @mock.patch.object( + handler.ActionExecutionSchedulingQueueHandler, '_get_next_execution', + mock.MagicMock(side_effect=KeyError())) + @mock.patch.object( + eventlet.GreenPool, 'spawn', + mock.MagicMock(return_value=None)) + def test_handler_retry_unexpected_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + self.assertRaises(KeyError, scheduling_queue_handler.process) + self.assertEqual(eventlet.GreenPool.spawn.call_count, 0) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure(), [MOCK_QUEUE_ITEM]])) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_retry_connection_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + scheduling_queue_handler._handle_garbage_collection() + + # Make sure retry occurs and that _handle_execution in process is called. + calls = [mock.call(MOCK_QUEUE_ITEM, publish=False)] + ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.assert_has_calls(calls) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=[pymongo.errors.ConnectionFailure()] * 3)) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_retries_exhausted(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + + self.assertRaises( + pymongo.errors.ConnectionFailure, + scheduling_queue_handler._handle_garbage_collection + ) + + self.assertEqual(ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.call_count, 0) + + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'query', + mock.MagicMock(side_effect=KeyError())) + @mock.patch.object( + ex_q_db_access.ActionExecutionSchedulingQueue, 'add_or_update', + mock.MagicMock(return_value=None)) + def test_handler_gc_unexpected_error(self): + scheduling_queue_handler = handler.ActionExecutionSchedulingQueueHandler() + + self.assertRaises( + KeyError, + scheduling_queue_handler._handle_garbage_collection + ) + + self.assertEqual(ex_q_db_access.ActionExecutionSchedulingQueue.add_or_update.call_count, 0) diff --git a/st2common/st2common/util/service.py b/st2common/st2common/util/service.py new file mode 100644 index 0000000000..9e9a7df92d --- /dev/null +++ b/st2common/st2common/util/service.py @@ -0,0 +1,36 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import pymongo + +from st2common import log as logging + + +LOG = logging.getLogger(__name__) + + +def retry_on_exceptions(exc): + LOG.warning('Evaluating retry on exception %s. %s', type(exc), str(exc)) + + is_mongo_connection_error = isinstance(exc, pymongo.errors.ConnectionFailure) + + retrying = is_mongo_connection_error + + if retrying: + LOG.warning('Retrying on exception %s.', type(exc)) + + return retrying diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 60b9480d22..c8cda2cd86 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -278,6 +278,12 @@ def _register_scheduler_opts(): cfg.FloatOpt( 'gc_interval', default=5, help='How often to look for zombie executions before rescheduling them (in ms).'), + cfg.IntOpt( + 'retry_max_attempt', default=3, + help='The maximum number of attempts that the scheduler retries on error.'), + cfg.IntOpt( + 'retry_wait_msec', default=100, + help='The number of milliseconds to wait in between retries.') ] _register_opts(scheduler_opts, group='scheduler')