diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7199e85d5e..b93d5f05cc 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,8 +12,10 @@ Added #4757 * Add ``user`` parameter to ``re_run`` method of st2client. #4785 * Install pack dependencies automatically. #4769 -* Add support for `immutable_parameters` on Action Aliases. This feature allows default parameters to be supplied to the action on every execution of the alias. #4786 -* Add ``get_entrypoint()`` method to ``ActionResourceManager`` attribute of st2client. #4791 +* Add support for `immutable_parameters` on Action Aliases. This feature allows default + parameters to be supplied to the action on every execution of the alias. #4786 +* Add ``get_entrypoint()`` method to ``ActionResourceManager`` attribute of st2client. + #4791 Changed ~~~~~~~ @@ -34,6 +36,9 @@ Changed (improvement) Reported and contributed by Joshua Meyer (@jdmeyer3) #4803 +* Add new ``action_runner.pip_opts`` st2.conf config option which allows user to specify a list + of command line option which are passed to ``pip install`` command when installing pack + dependencies into a pack specific virtual environment. #4792 Fixed ~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index b3a7262a46..50e8042baf 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -8,6 +8,8 @@ enable = True emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list allowed here. [actionrunner] +# List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment. +pip_opts = # comma separated list allowed here. # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 # Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter. diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 3bbd5725c1..4b89c08650 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -374,6 +374,10 @@ def register_opts(ignore_errors=False): 'virtualenv_opts', default=['--system-site-packages'], help='List of virtualenv options to be passsed to "virtualenv" command that ' 'creates pack virtualenv.'), + cfg.ListOpt( + 'pip_opts', default=[], + help='List of pip options to be passed to "pip install" command when installing pack ' + 'dependencies into pack virtual environment.'), cfg.BoolOpt( 'stream_output', default=True, help='True to store and stream action output (stdout and stderr) in real-time.'), diff --git a/st2common/st2common/log.py b/st2common/st2common/log.py index cf1347ea81..15c25002a5 100644 --- a/st2common/st2common/log.py +++ b/st2common/st2common/log.py @@ -181,10 +181,22 @@ def _redirect_stderr(): sys.stderr = LoggingStream('STDERR') -def setup(config_file, redirect_stderr=True, excludes=None, disable_existing_loggers=False): +def setup(config_file, redirect_stderr=True, excludes=None, disable_existing_loggers=False, + st2_conf_path=None): """ Configure logging from file. + + :param st2_conf_path: Optional path to st2.conf file. If provided and "config_file" path is + relative to st2.conf path, the config_file path will get resolved to full + absolute path relative to st2.conf. + :type st2_conf_path: ``str`` """ + if st2_conf_path and config_file[:2] == './' and not os.path.isfile(config_file): + # Logging config path is relative to st2.conf, resolve it to full absolute path + directory = os.path.dirname(st2_conf_path) + config_file_name = os.path.basename(config_file) + config_file = os.path.join(directory, config_file_name) + try: logging.config.fileConfig(config_file, defaults=None, diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index 62544eec3a..c40d2c311d 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -41,7 +41,9 @@ 'spawn', 'wait', 'cancel', - 'kill' + 'kill', + 'sleep', + 'get_greenlet_exit_exception_class' ] @@ -111,3 +113,21 @@ def kill(green_thread, *args, **kwargs): return green_thread.kill(*args, **kwargs) else: raise ValueError('Unsupported concurrency library') + + +def sleep(*args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.sleep(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.sleep(*args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') + + +def get_greenlet_exit_exception_class(): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.support.greenlets.GreenletExit + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.GreenletExit + else: + raise ValueError('Unsupported concurrency library') diff --git a/st2common/st2common/util/virtualenvs.py b/st2common/st2common/util/virtualenvs.py index b1eacad31d..0ac1c5977c 100644 --- a/st2common/st2common/util/virtualenvs.py +++ b/st2common/st2common/util/virtualenvs.py @@ -158,7 +158,7 @@ def create_virtualenv(virtualenv_path, logger=None, include_pip=True, include_se python_binary = cfg.CONF.actionrunner.python_binary python3_binary = cfg.CONF.actionrunner.python3_binary virtualenv_binary = cfg.CONF.actionrunner.virtualenv_binary - virtualenv_opts = cfg.CONF.actionrunner.virtualenv_opts + virtualenv_opts = cfg.CONF.actionrunner.virtualenv_opts or [] if not os.path.isfile(python_binary): raise Exception('Python binary "%s" doesn\'t exist' % (python_binary)) @@ -237,6 +237,7 @@ def install_requirements(virtualenv_path, requirements_file_path, proxy_config=N """ logger = logger or LOG pip_path = os.path.join(virtualenv_path, 'bin/pip') + pip_opts = cfg.CONF.actionrunner.pip_opts or [] cmd = [pip_path] if proxy_config: @@ -253,7 +254,10 @@ def install_requirements(virtualenv_path, requirements_file_path, proxy_config=N if cert: cmd.extend(['--cert', cert]) - cmd.extend(['install', '-U', '-r', requirements_file_path]) + cmd.append('install') + cmd.extend(pip_opts) + cmd.extend(['-U', '-r', requirements_file_path]) + env = get_env_for_subprocess_command() logger.debug('Installing requirements from file %s with command %s.', @@ -278,6 +282,7 @@ def install_requirement(virtualenv_path, requirement, proxy_config=None, logger= """ logger = logger or LOG pip_path = os.path.join(virtualenv_path, 'bin/pip') + pip_opts = cfg.CONF.actionrunner.pip_opts or [] cmd = [pip_path] if proxy_config: @@ -294,7 +299,9 @@ def install_requirement(virtualenv_path, requirement, proxy_config=None, logger= if cert: cmd.extend(['--cert', cert]) - cmd.extend(['install', requirement]) + cmd.append('install') + cmd.extend(pip_opts) + cmd.extend([requirement]) env = get_env_for_subprocess_command() logger.debug('Installing requirement %s with command %s.', requirement, ' '.join(cmd)) diff --git a/st2reactor/st2reactor/cmd/timersengine.py b/st2reactor/st2reactor/cmd/timersengine.py index 71d3fb29c5..b91dcaefcb 100644 --- a/st2reactor/st2reactor/cmd/timersengine.py +++ b/st2reactor/st2reactor/cmd/timersengine.py @@ -13,13 +13,14 @@ # limitations under the License. from __future__ import absolute_import + import os import sys -import eventlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.timer import TIMER_ENABLED_LOG_LINE, TIMER_DISABLED_LOG_LINE from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup @@ -61,7 +62,7 @@ def _run_worker(): if cfg.CONF.timer.enable or cfg.CONF.timersengine.enable: local_tz = cfg.CONF.timer.local_timezone or cfg.CONF.timersengine.local_timezone timer = St2Timer(local_timezone=local_tz) - timer_thread = eventlet.spawn(_kickoff_timer, timer) + timer_thread = concurrency.spawn(_kickoff_timer, timer) LOG.info(TIMER_ENABLED_LOG_LINE) return timer_thread.wait() else: @@ -84,7 +85,7 @@ def main(): return _run_worker() except SystemExit as exit_code: sys.exit(exit_code) - except: + except Exception: LOG.exception('(PID=%s) TimerEngine quit due to exception.', os.getpid()) return 1 finally: diff --git a/st2reactor/st2reactor/container/manager.py b/st2reactor/st2reactor/container/manager.py index 917481ddd0..6942c78a81 100644 --- a/st2reactor/st2reactor/container/manager.py +++ b/st2reactor/st2reactor/container/manager.py @@ -13,13 +13,13 @@ # limitations under the License. from __future__ import absolute_import + import os import sys import signal -import eventlet - from st2common import log as logging +from st2common.util import concurrency from st2reactor.container.process_container import ProcessSensorContainer from st2common.services.sensor_watcher import SensorWatcher from st2common.models.system.common import ResourceReference @@ -75,7 +75,7 @@ def _spin_container_and_wait(self, sensors): self._sensor_container = ProcessSensorContainer( sensors=sensors, single_sensor_mode=self._single_sensor_mode) - self._container_thread = eventlet.spawn(self._sensor_container.run) + self._container_thread = concurrency.spawn(self._sensor_container.run) LOG.debug('Starting sensor CUD watcher...') self._sensors_watcher.start() @@ -90,7 +90,7 @@ def _spin_container_and_wait(self, sensors): LOG.info('(PID:%s) SensorContainer stopped. Reason - %s', os.getpid(), sys.exc_info()[0].__name__) - eventlet.kill(self._container_thread) + concurrency.kill(self._container_thread) self._container_thread = None return exit_code diff --git a/st2reactor/st2reactor/container/process_container.py b/st2reactor/st2reactor/container/process_container.py index 81fe9654ac..592447b811 100644 --- a/st2reactor/st2reactor/container/process_container.py +++ b/st2reactor/st2reactor/container/process_container.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import absolute_import + import os import sys import time @@ -22,11 +23,10 @@ from collections import defaultdict import six -import eventlet -from eventlet.support import greenlets as greenlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST from st2common.constants.error_messages import PACK_VIRTUALENV_USES_PYTHON3 from st2common.constants.system import API_URL_ENV_VARIABLE_NAME @@ -79,16 +79,26 @@ class ProcessSensorContainer(object): Sensor container which runs sensors in a separate process. """ - def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatcher=None): + def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatcher=None, + wrapper_script_path=WRAPPER_SCRIPT_PATH, create_token=True): """ :param sensors: A list of sensor dicts. :type sensors: ``list`` of ``dict`` :param poll_interval: How long to sleep between each poll for running / dead sensors. :type poll_interval: ``float`` + + :param wrapper_script_path: Path to the sensor wrapper script. + :type wrapper_script_path: ``str`` + + :param create_token: True to create temporary authentication token for the purpose for each + sensor process and add it to that process environment variables. + :type create_token: ``bool`` """ self._poll_interval = poll_interval self._single_sensor_mode = single_sensor_mode + self._wrapper_script_path = wrapper_script_path + self._create_token = create_token if self._single_sensor_mode: # For more immediate feedback we use lower poll interval when running in single sensor @@ -98,9 +108,7 @@ def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatche self._sensors = {} # maps sensor_id -> sensor object self._processes = {} # maps sensor_id -> sensor process - if not dispatcher: - dispatcher = TriggerDispatcher(LOG) - self._dispatcher = dispatcher + self._dispatcher = dispatcher or TriggerDispatcher(LOG) self._stopped = False self._exit_code = None # exit code with which this process should exit @@ -129,6 +137,8 @@ def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatche def run(self): self._run_all_sensors() + success_exception_cls = concurrency.get_greenlet_exit_exception_class() + try: while not self._stopped: # Poll for all running processes @@ -140,8 +150,8 @@ def run(self): else: LOG.debug('No active sensors') - eventlet.sleep(self._poll_interval) - except greenlet.GreenletExit: + concurrency.sleep(self._poll_interval) + except success_exception_cls: # This exception is thrown when sensor container manager # kills the thread which runs process container. Not sure # if this is the best thing to do. @@ -180,8 +190,8 @@ def _poll_sensors_for_results(self, sensor_ids): # Try to respawn a dead process (maybe it was a simple failure which can be # resolved with a restart) - eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, - exit_code=status) + concurrency.spawn(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, + exit_code=status) else: sensor_start_time = self._sensor_start_times[sensor_id] sensor_respawn_count = self._sensor_respawn_counts[sensor_id] @@ -294,26 +304,7 @@ def _spawn_sensor_process(self, sensor): msg = PACK_VIRTUALENV_USES_PYTHON3 % format_values raise Exception(msg) - trigger_type_refs = sensor['trigger_types'] or [] - trigger_type_refs = ','.join(trigger_type_refs) - - parent_args = json.dumps(sys.argv[1:]) - - args = [ - python_path, - WRAPPER_SCRIPT_PATH, - '--pack=%s' % (sensor['pack']), - '--file-path=%s' % (sensor['file_path']), - '--class-name=%s' % (sensor['class_name']), - '--trigger-type-refs=%s' % (trigger_type_refs), - '--parent-args=%s' % (parent_args) - ] - - if sensor['poll_interval']: - args.append('--poll-interval=%s' % (sensor['poll_interval'])) - - sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True, - inherit_parent_virtualenv=True) + args = self._get_args_for_wrapper_script(python_binary=python_path, sensor=sensor) if self._enable_common_pack_libs: pack_common_libs_path = get_pack_common_libs_path_for_pack_ref(pack_ref=pack_ref) @@ -322,27 +313,34 @@ def _spawn_sensor_process(self, sensor): env = os.environ.copy() + sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True, + inherit_parent_virtualenv=True) + if self._enable_common_pack_libs and pack_common_libs_path: env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path else: env['PYTHONPATH'] = sandbox_python_path - # Include full api URL and API token specific to that sensor - ttl = cfg.CONF.auth.service_token_ttl - metadata = { - 'service': 'sensors_container', - 'sensor_path': sensor['file_path'], - 'sensor_class': sensor['class_name'] - } - temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata, - service=True) + if self._create_token: + # Include full api URL and API token specific to that sensor + LOG.debug('Creating temporary auth token for sensor %s' % (sensor['class_name'])) - env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() - env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token + ttl = cfg.CONF.auth.service_token_ttl + metadata = { + 'service': 'sensors_container', + 'sensor_path': sensor['file_path'], + 'sensor_class': sensor['class_name'] + } + temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata, + service=True) + + env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() + env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token + + # TODO 1: Purge temporary token when service stops or sensor process dies + # TODO 2: Store metadata (wrapper process id) with the token and delete + # tokens for old, dead processes on startup - # TODO 1: Purge temporary token when service stops or sensor process dies - # TODO 2: Store metadata (wrapper process id) with the token and delete - # tokens for old, dead processes on startup cmd = ' '.join(args) LOG.debug('Running sensor subprocess (cmd="%s")', cmd) @@ -434,7 +432,7 @@ def _respawn_sensor(self, sensor_id, sensor, exit_code): self._sensor_respawn_counts[sensor_id] += 1 sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id]) - eventlet.sleep(sleep_delay) + concurrency.sleep(sleep_delay) try: self._spawn_sensor_process(sensor=sensor) @@ -459,6 +457,38 @@ def _should_respawn_sensor(self, sensor_id, sensor, exit_code): return True + def _get_args_for_wrapper_script(self, python_binary, sensor): + """ + Return CLI arguments passed to the sensor wrapper script. + + :param python_binary: Python binary used to execute wrapper script. + :type python_binary: ``str`` + + :param sensor: Sensor object dictionary. + :type sensor: ``dict`` + + :rtype: ``list`` + """ + trigger_type_refs = sensor['trigger_types'] or [] + trigger_type_refs = ','.join(trigger_type_refs) + + parent_args = json.dumps(sys.argv[1:]) + + args = [ + python_binary, + self._wrapper_script_path, + '--pack=%s' % (sensor['pack']), + '--file-path=%s' % (sensor['file_path']), + '--class-name=%s' % (sensor['class_name']), + '--trigger-type-refs=%s' % (trigger_type_refs), + '--parent-args=%s' % (parent_args) + ] + + if sensor['poll_interval']: + args.append('--poll-interval=%s' % (sensor['poll_interval'])) + + return args + def _get_sensor_id(self, sensor): """ Return unique identifier for the provider sensor dict. diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index ae9c747b6f..c1d4c14ab1 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -23,11 +23,10 @@ import random import six -import eventlet -from eventlet.support import greenlets as greenlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.exit_codes import SUCCESS_EXIT_CODE from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2common.constants.garbage_collection import DEFAULT_COLLECTION_INTERVAL @@ -81,11 +80,13 @@ def run(self): # Wait a couple of seconds before performing initial collection to prevent thundering herd # effect when restarting multiple services at the same time jitter_seconds = random.uniform(0, 3) - eventlet.sleep(jitter_seconds) + concurrency.sleep(jitter_seconds) + + success_exception_cls = concurrency.get_greenlet_exit_exception_class() try: self._main_loop() - except greenlet.GreenletExit: + except success_exception_cls: self._running = False return SUCCESS_EXIT_CODE except Exception as e: @@ -111,7 +112,7 @@ def _main_loop(self): LOG.info('Sleeping for %s seconds before next garbage collection...' % (self._collection_interval)) - eventlet.sleep(self._collection_interval) + concurrency.sleep(self._collection_interval) def _validate_ttl_values(self): """ @@ -142,7 +143,7 @@ def _perform_garbage_collection(self): if self._action_executions_ttl and self._action_executions_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_action_executions() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -151,7 +152,7 @@ def _perform_garbage_collection(self): self._action_executions_output_ttl >= MINIMUM_TTL_DAYS_EXECUTION_OUTPUT: LOG.info(proc_message, obj_type) self._purge_action_executions_output() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -159,7 +160,7 @@ def _perform_garbage_collection(self): if self._trigger_instances_ttl and self._trigger_instances_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_trigger_instances() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -167,7 +168,7 @@ def _perform_garbage_collection(self): if self._purge_inquiries: LOG.info(proc_message, obj_type) self._timeout_inquiries() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -175,7 +176,7 @@ def _perform_garbage_collection(self): if self._workflow_execution_max_idle > 0: LOG.info(proc_message, obj_type) self._purge_orphaned_workflow_executions() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) diff --git a/st2reactor/st2reactor/sensor/base.py b/st2reactor/st2reactor/sensor/base.py index 617ed565a6..f7350c06e3 100644 --- a/st2reactor/st2reactor/sensor/base.py +++ b/st2reactor/st2reactor/sensor/base.py @@ -13,10 +13,12 @@ # limitations under the License. from __future__ import absolute_import + import abc import six -import eventlet + +from st2common.util import concurrency __all__ = [ 'Sensor', @@ -117,7 +119,7 @@ def poll(self): def run(self): while True: self.poll() - eventlet.sleep(self._poll_interval) + concurrency.sleep(self._poll_interval) def get_poll_interval(self): """ diff --git a/st2reactor/tests/integration/test_garbage_collector.py b/st2reactor/tests/integration/test_garbage_collector.py index d91d8016f2..f321025911 100644 --- a/st2reactor/tests/integration/test_garbage_collector.py +++ b/st2reactor/tests/integration/test_garbage_collector.py @@ -19,9 +19,7 @@ import signal import datetime -import eventlet -from eventlet.green import subprocess - +from st2common.util import concurrency from st2common.constants import action as action_constants from st2common.util import date as date_utils from st2common.models.db.execution import ActionExecutionDB @@ -187,7 +185,7 @@ def test_garbage_collection(self): process = self._start_garbage_collector() # Give it some time to perform garbage collection and kill it - eventlet.sleep(15) + concurrency.sleep(15) process.send_signal(signal.SIGKILL) self.remove_process(process=process) @@ -235,7 +233,7 @@ def test_inquiry_garbage_collection(self): process = self._start_garbage_collector() # Give it some time to perform garbage collection and kill it - eventlet.sleep(15) + concurrency.sleep(15) process.send_signal(signal.SIGKILL) self.remove_process(process=process) @@ -254,6 +252,7 @@ def _create_inquiry(self, ttl, timestamp): executions.create_execution_object(liveaction_db) def _start_garbage_collector(self): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(CMD_INQUIRY, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_rules_engine.py b/st2reactor/tests/integration/test_rules_engine.py index f61709e66c..ab7526925c 100644 --- a/st2reactor/tests/integration/test_rules_engine.py +++ b/st2reactor/tests/integration/test_rules_engine.py @@ -19,8 +19,7 @@ import signal import tempfile -from eventlet.green import subprocess - +from st2common.util import concurrency from st2common.constants.timer import TIMER_ENABLED_LOG_LINE from st2common.constants.timer import TIMER_DISABLED_LOG_LINE from st2tests.base import IntegrationTestCase @@ -134,6 +133,7 @@ def test_timer_disable_explicit(self): (TIMER_DISABLED_LOG_LINE)) def _start_times_engine(self, cmd): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_sensor_container.py b/st2reactor/tests/integration/test_sensor_container.py index 018a825ac1..d3e7c04d2f 100644 --- a/st2reactor/tests/integration/test_sensor_container.py +++ b/st2reactor/tests/integration/test_sensor_container.py @@ -19,11 +19,10 @@ import signal import psutil -import eventlet -from eventlet.green import subprocess from oslo_config import cfg import st2tests.config +from st2common.util import concurrency from st2common.models.db import db_setup from st2reactor.container.process_container import PROCESS_EXIT_TIMEOUT from st2common.util.green.shell import run_command @@ -94,7 +93,7 @@ def test_child_processes_are_killed_on_sigint(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(5) + concurrency.sleep(5) # Assert process has started and is running self.assertProcessIsRunning(process=process) @@ -110,7 +109,7 @@ def test_child_processes_are_killed_on_sigint(self): # SIGINT causes graceful shutdown so give it some time to gracefuly shut down the sensor # child processes - eventlet.sleep(PROCESS_EXIT_TIMEOUT + 1) + concurrency.sleep(PROCESS_EXIT_TIMEOUT + 1) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -122,7 +121,7 @@ def test_child_processes_are_killed_on_sigterm(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(3) + concurrency.sleep(3) # Verify container process and children sensor / wrapper processes are running pp = psutil.Process(process.pid) @@ -135,7 +134,7 @@ def test_child_processes_are_killed_on_sigterm(self): # SIGTERM causes graceful shutdown so give it some time to gracefuly shut down the sensor # child processes - eventlet.sleep(PROCESS_EXIT_TIMEOUT + 5) + concurrency.sleep(PROCESS_EXIT_TIMEOUT + 5) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -147,7 +146,7 @@ def test_child_processes_are_killed_on_sigkill(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(4) + concurrency.sleep(4) # Verify container process and children sensor / wrapper processes are running pp = psutil.Process(process.pid) @@ -159,7 +158,7 @@ def test_child_processes_are_killed_on_sigkill(self): process.send_signal(signal.SIGKILL) # Note: On SIGKILL processes should be killed instantly - eventlet.sleep(1) + concurrency.sleep(1) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -175,7 +174,7 @@ def test_single_sensor_mode(self): pp = psutil.Process(process.pid) # Give it some time to start up - eventlet.sleep(4) + concurrency.sleep(4) stdout = process.stdout.read() self.assertTrue((b'--sensor-ref argument must be provided when running in single sensor ' @@ -191,7 +190,7 @@ def test_single_sensor_mode(self): pp = psutil.Process(process.pid) # Give it some time to start up - eventlet.sleep(8) + concurrency.sleep(8) # Container should exit and not respawn a sensor in single sensor mode stdout = process.stdout.read() @@ -200,12 +199,13 @@ def test_single_sensor_mode(self): self.assertTrue(b'Not respawning a sensor since running in single sensor mode') self.assertTrue(b'Process container quit with exit_code 110.') - eventlet.sleep(2) + concurrency.sleep(2) self.assertProcessExited(proc=pp) self.remove_process(process=process) def _start_sensor_container(self, cmd=DEFAULT_CMD): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_sensor_watcher.py b/st2reactor/tests/integration/test_sensor_watcher.py index 59ec452ce5..7a33cbb97e 100644 --- a/st2reactor/tests/integration/test_sensor_watcher.py +++ b/st2reactor/tests/integration/test_sensor_watcher.py @@ -13,10 +13,11 @@ # limitations under the License. from __future__ import absolute_import -import eventlet + from monotonic import monotonic from pyrabbit.api import Client +from st2common.util import concurrency from st2common.services.sensor_watcher import SensorWatcher from st2tests.base import IntegrationTestCase @@ -50,7 +51,7 @@ def delete_handler(sensor_db): start = monotonic() done = False while not done: - eventlet.sleep(0.01) + concurrency.sleep(0.01) sw_queues = self._get_sensor_watcher_amqp_queues(queue_name='st2.sensor.watch.covfefe') done = len(sw_queues) > 0 or ((monotonic() - start) < 5) diff --git a/st2reactor/tests/unit/test_process_container.py b/st2reactor/tests/unit/test_process_container.py index f353a0719b..f36b5277ea 100644 --- a/st2reactor/tests/unit/test_process_container.py +++ b/st2reactor/tests/unit/test_process_container.py @@ -16,11 +16,11 @@ import os import time -import eventlet from mock import (MagicMock, Mock, patch) import unittest2 from st2reactor.container.process_container import ProcessSensorContainer +from st2common.util import concurrency from st2common.models.db.pack import PackDB from st2common.persistence.pack import Pack @@ -35,8 +35,8 @@ class ProcessContainerTests(unittest2.TestCase): def test_no_sensors_dont_quit(self): process_container = ProcessSensorContainer(None, poll_interval=0.1) - process_container_thread = eventlet.spawn(process_container.run) - eventlet.sleep(0.5) + process_container_thread = concurrency.spawn(process_container.run) + concurrency.sleep(0.5) self.assertEqual(process_container.running(), 0) self.assertEqual(process_container.stopped(), False) process_container.shutdown()