Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Changed
* Moved the lock from concurrency policies into the scheduler to fix a race condition when there
are multiple scheduler instances scheduling execution for action with concurrency policies.
#4481 (bug fix)
* Add retries to scheduler to handle temporary hiccup in DB connection. Refactor scheduler
service to return proper exit code when there is a failure. #4539 (bug fix)

Fixed
~~~~~
Expand Down
12 changes: 8 additions & 4 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,18 @@ thread_pool_size = 10
logging = /etc/st2/logging.rulesengine.conf

[scheduler]
# How long (in seconds) to sleep between each action scheduler main loop run interval.
sleep_interval = 0.1
# How often (in seconds) to look for zombie execution requests before rescheduling them.
gc_interval = 10
# The maximum number of attempts that the scheduler retries on error.
retry_max_attempt = 10
# Location of the logging configuration file.
logging = /etc/st2/logging.scheduler.conf
# How long (in seconds) to sleep between each action scheduler main loop run interval.
sleep_interval = 0.1
# The size of the pool used by the scheduler for scheduling executions.
pool_size = 10
# The number of milliseconds to wait in between retries.
retry_wait_msec = 3000
# How often (in seconds) to look for zombie execution requests before rescheduling them.
gc_interval = 10

[schema]
# Version of JSON schema to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import os
import tempfile

from st2tests import config as test_config
test_config.parse_args()

from st2common.bootstrap import actionsregistrar
from st2common.bootstrap import runnersregistrar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import os
import tempfile

from st2tests import config as test_config
test_config.parse_args()

from st2common.bootstrap import actionsregistrar
from st2common.bootstrap import runnersregistrar

Expand Down
Empty file modified st2actions/bin/runners.sh
100644 → 100755
Empty file.
Empty file modified st2actions/bin/st2scheduler
100644 → 100755
Empty file.
30 changes: 27 additions & 3 deletions st2actions/st2actions/cmd/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Monkey patching should be done as early as possible.
# See http://eventlet.net/doc/patching.html#monkeypatching-the-standard-library
from __future__ import absolute_import

from st2common.util.monkey_patch import monkey_patch
monkey_patch()

Expand Down Expand Up @@ -36,7 +51,7 @@ def _setup():
_setup_sigterm_handler()


def _run_queuer():
def _run_scheduler():
LOG.info('(PID=%s) Scheduler started.', os.getpid())

# Lazy load these so that decorator metrics are in place
Expand All @@ -51,7 +66,9 @@ def _run_queuer():
try:
handler.start()
entrypoint.start()
entrypoint.wait()

# Wait on handler first since entrypoint is more durable.
handler.wait() or entrypoint.wait()
except (KeyboardInterrupt, SystemExit):
LOG.info('(PID=%s) Scheduler stopped.', os.getpid())

Expand All @@ -68,6 +85,13 @@ def _run_queuer():
return 1
except:
LOG.exception('(PID=%s) Scheduler unexpectedly stopped.', os.getpid())

try:
handler.shutdown()
entrypoint.shutdown()
except:
pass

return 1

return 0
Expand All @@ -80,7 +104,7 @@ def _teardown():
def main():
try:
_setup()
return _run_queuer()
return _run_scheduler()
except SystemExit as exit_code:
sys.exit(exit_code)
except:
Expand Down
16 changes: 14 additions & 2 deletions st2actions/st2actions/scheduler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

from st2common import config as common_config
from st2common.constants import system as sys_constants
from st2common import log as logging


LOG = logging.getLogger(__name__)


def parse_args(args=None):
Expand Down Expand Up @@ -56,10 +60,18 @@ def _register_service_opts():
'gc_interval', default=10,
help='How often (in seconds) to look for zombie execution requests before rescheduling '
'them.'),

cfg.IntOpt(
'retry_max_attempt', default=10,
help='The maximum number of attempts that the scheduler retries on error.'),
cfg.IntOpt(
'retry_wait_msec', default=3000,
help='The number of milliseconds to wait in between retries.')
]

cfg.CONF.register_opts(scheduler_opts, group='scheduler')


register_opts()
try:
register_opts()
except cfg.DuplicateOptError:
LOG.exception('The scheduler configuration options are already parsed and loaded.')
45 changes: 36 additions & 9 deletions st2actions/st2actions/scheduler/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from __future__ import absolute_import

import eventlet
import retrying
from oslo_config import cfg

from st2common import log as logging
from st2common.util import date
from st2common.util import service as service_utils
from st2common.constants import action as action_constants
from st2common.constants import policy as policy_constants
from st2common.exceptions.db import StackStormDBObjectNotFoundError
Expand Down Expand Up @@ -60,25 +62,37 @@ def __init__(self):
self._shutdown = False
self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size)
self._coordinator = coordination_service.get_coordinator()
self._main_thread = None
self._cleanup_thread = None

def run(self):
LOG.debug('Entering scheduler loop')
LOG.debug('Starting scheduler handler...')

while not self._shutdown:
eventlet.greenthread.sleep(cfg.CONF.scheduler.sleep_interval)
self.process()

execution_queue_item_db = self._get_next_execution()
@retrying.retry(
retry_on_exception=service_utils.retry_on_exceptions,
stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt,
wait_fixed=cfg.CONF.scheduler.retry_wait_msec)
def process(self):
execution_queue_item_db = self._get_next_execution()

if execution_queue_item_db:
self._pool.spawn(self._handle_execution, execution_queue_item_db)
if execution_queue_item_db:
self._pool.spawn(self._handle_execution, execution_queue_item_db)

def cleanup(self):
LOG.debug('Starting scheduler garbage collection')
LOG.debug('Starting scheduler garbage collection...')

while not self._shutdown:
eventlet.greenthread.sleep(cfg.CONF.scheduler.gc_interval)
self._handle_garbage_collection()

@retrying.retry(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm personally still not too sure about this retry here. It's seems like a one off - aka we don't do it in other similar services.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides that, the linking looks good to me, let's please just add some test cases for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the retries there is ok. For a single server install w/o any complex service management, this prevents temporary hiccups with MongoDB connection. If users want to fail fast, they can reconfigure the retries. If you're worry about consistency, we can revisit. After this issue, our service pattern need an overhaul.

retry_on_exception=service_utils.retry_on_exceptions,
stop_max_attempt_number=cfg.CONF.scheduler.retry_max_attempt,
wait_fixed=cfg.CONF.scheduler.retry_wait_msec)
def _handle_garbage_collection(self):
"""
Periodically look for executions which have "handling" set to "True" and haven't been
Expand Down Expand Up @@ -328,11 +342,24 @@ def _update_to_scheduled(liveaction_db, execution_queue_item_db):
def start(self):
self._shutdown = False

eventlet.spawn(self.run)
eventlet.spawn(self.cleanup)
# Spawn the worker threads.
self._main_thread = eventlet.spawn(self.run)
self._cleanup_thread = eventlet.spawn(self.cleanup)

def shutdown(self):
self._shutdown = True
# Link the threads to the shutdown function. If either of the threads exited with error,
# then initiate shutdown which will allow the waits below to throw exception to the
# main process.
self._main_thread.link(self.shutdown)
self._cleanup_thread.link(self.shutdown)

def shutdown(self, *args, **kwargs):
if not self._shutdown:
self._shutdown = True

def wait(self):
# Wait for the worker threads to complete. If there is an exception thrown in the thread,
# then the exception will be propagated to the main process for a proper return code.
self._main_thread.wait() or self._cleanup_thread.wait()


def get_handler():
Expand Down
6 changes: 3 additions & 3 deletions st2actions/tests/unit/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import mock
import eventlet

from st2tests import config as test_config
test_config.parse_args()

import st2common
from st2tests import ExecutionDbTestCase
from st2tests.fixturesloader import FixturesLoader
Expand All @@ -39,9 +42,6 @@
from st2common.services import executions as execution_service
from st2common.exceptions import db as db_exc

from st2tests import config as test_config
test_config.parse_args()


LIVE_ACTION = {
'parameters': {
Expand Down
83 changes: 83 additions & 0 deletions st2actions/tests/unit/test_scheduler_entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import eventlet
import mock

from st2tests import config as test_config
test_config.parse_args()

from st2actions.cmd.scheduler import _run_scheduler
from st2actions.scheduler.handler import ActionExecutionSchedulingQueueHandler
from st2actions.scheduler.entrypoint import SchedulerEntrypoint

from st2tests.base import CleanDbTestCase

__all__ = [
'SchedulerServiceEntryPointTestCase'
]


def mock_handler_run(self):
# NOTE: We use eventlet.sleep to emulate async nature of this process
eventlet.sleep(0.2)
raise Exception('handler run exception')


def mock_handler_cleanup(self):
# NOTE: We use eventlet.sleep to emulate async nature of this process
eventlet.sleep(0.2)
raise Exception('handler clean exception')


def mock_entrypoint_start(self):
# NOTE: We use eventlet.sleep to emulate async nature of this process
eventlet.sleep(0.2)
raise Exception('entrypoint start exception')


class SchedulerServiceEntryPointTestCase(CleanDbTestCase):
@mock.patch.object(ActionExecutionSchedulingQueueHandler, 'run', mock_handler_run)
@mock.patch('st2actions.cmd.scheduler.LOG')
def test_service_exits_correctly_on_fatal_exception_in_handler_run(self, mock_log):
run_thread = eventlet.spawn(_run_scheduler)
result = run_thread.wait()

self.assertEqual(result, 1)

mock_log_exception_call = mock_log.exception.call_args_list[0][0][0]
self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call)

@mock.patch.object(ActionExecutionSchedulingQueueHandler, 'cleanup', mock_handler_cleanup)
@mock.patch('st2actions.cmd.scheduler.LOG')
def test_service_exits_correctly_on_fatal_exception_in_handler_cleanup(self, mock_log):
run_thread = eventlet.spawn(_run_scheduler)
result = run_thread.wait()

self.assertEqual(result, 1)

mock_log_exception_call = mock_log.exception.call_args_list[0][0][0]
self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call)

@mock.patch.object(SchedulerEntrypoint, 'start', mock_entrypoint_start)
@mock.patch('st2actions.cmd.scheduler.LOG')
def test_service_exits_correctly_on_fatal_exception_in_entrypoint_start(self, mock_log):
run_thread = eventlet.spawn(_run_scheduler)
result = run_thread.wait()

self.assertEqual(result, 1)

mock_log_exception_call = mock_log.exception.call_args_list[0][0][0]
self.assertTrue('Scheduler unexpectedly stopped' in mock_log_exception_call)
Loading