From d05c236be64b3c2b2850bc2f2aa68438d7c7dfda Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Thu, 19 Sep 2019 12:17:26 +0200 Subject: [PATCH 01/11] Instead of calling eventlet directly, pass the calls through the concurrency abstraction which works with either gevent or eventlet. --- st2common/st2common/util/concurrency.py | 12 ++++++++++- st2reactor/st2reactor/cmd/timersengine.py | 7 ++++--- st2reactor/st2reactor/container/manager.py | 7 +++---- .../st2reactor/container/process_container.py | 16 +++++++------- .../st2reactor/garbage_collector/base.py | 21 ++++++++++--------- st2reactor/st2reactor/sensor/base.py | 6 ++++-- 6 files changed, 42 insertions(+), 27 deletions(-) diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index 62544eec3a..2735b8aa07 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -41,7 +41,8 @@ 'spawn', 'wait', 'cancel', - 'kill' + 'kill', + 'sleep' ] @@ -111,3 +112,12 @@ def kill(green_thread, *args, **kwargs): return green_thread.kill(*args, **kwargs) else: raise ValueError('Unsupported concurrency library') + + +def sleep(*args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.sleep(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.sleep(*args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') diff --git a/st2reactor/st2reactor/cmd/timersengine.py b/st2reactor/st2reactor/cmd/timersengine.py index 71d3fb29c5..b91dcaefcb 100644 --- a/st2reactor/st2reactor/cmd/timersengine.py +++ b/st2reactor/st2reactor/cmd/timersengine.py @@ -13,13 +13,14 @@ # limitations under the License. from __future__ import absolute_import + import os import sys -import eventlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.timer import TIMER_ENABLED_LOG_LINE, TIMER_DISABLED_LOG_LINE from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup @@ -61,7 +62,7 @@ def _run_worker(): if cfg.CONF.timer.enable or cfg.CONF.timersengine.enable: local_tz = cfg.CONF.timer.local_timezone or cfg.CONF.timersengine.local_timezone timer = St2Timer(local_timezone=local_tz) - timer_thread = eventlet.spawn(_kickoff_timer, timer) + timer_thread = concurrency.spawn(_kickoff_timer, timer) LOG.info(TIMER_ENABLED_LOG_LINE) return timer_thread.wait() else: @@ -84,7 +85,7 @@ def main(): return _run_worker() except SystemExit as exit_code: sys.exit(exit_code) - except: + except Exception: LOG.exception('(PID=%s) TimerEngine quit due to exception.', os.getpid()) return 1 finally: diff --git a/st2reactor/st2reactor/container/manager.py b/st2reactor/st2reactor/container/manager.py index 917481ddd0..b655305a7b 100644 --- a/st2reactor/st2reactor/container/manager.py +++ b/st2reactor/st2reactor/container/manager.py @@ -13,12 +13,11 @@ # limitations under the License. from __future__ import absolute_import + import os import sys import signal -import eventlet - from st2common import log as logging from st2reactor.container.process_container import ProcessSensorContainer from st2common.services.sensor_watcher import SensorWatcher @@ -75,7 +74,7 @@ def _spin_container_and_wait(self, sensors): self._sensor_container = ProcessSensorContainer( sensors=sensors, single_sensor_mode=self._single_sensor_mode) - self._container_thread = eventlet.spawn(self._sensor_container.run) + self._container_thread = concurrency.spawn(self._sensor_container.run) LOG.debug('Starting sensor CUD watcher...') self._sensors_watcher.start() @@ -90,7 +89,7 @@ def _spin_container_and_wait(self, sensors): LOG.info('(PID:%s) SensorContainer stopped. Reason - %s', os.getpid(), sys.exc_info()[0].__name__) - eventlet.kill(self._container_thread) + concurrency.kill(self._container_thread) self._container_thread = None return exit_code diff --git a/st2reactor/st2reactor/container/process_container.py b/st2reactor/st2reactor/container/process_container.py index 81fe9654ac..5eee551cf8 100644 --- a/st2reactor/st2reactor/container/process_container.py +++ b/st2reactor/st2reactor/container/process_container.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import absolute_import + import os import sys import time @@ -22,11 +23,10 @@ from collections import defaultdict import six -import eventlet -from eventlet.support import greenlets as greenlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST from st2common.constants.error_messages import PACK_VIRTUALENV_USES_PYTHON3 from st2common.constants.system import API_URL_ENV_VARIABLE_NAME @@ -129,6 +129,8 @@ def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatche def run(self): self._run_all_sensors() + success_exception_cls = concurrency.get_greenlet_exit_exception_class() + try: while not self._stopped: # Poll for all running processes @@ -140,8 +142,8 @@ def run(self): else: LOG.debug('No active sensors') - eventlet.sleep(self._poll_interval) - except greenlet.GreenletExit: + concurrency.sleep(self._poll_interval) + except success_exception_cls: # This exception is thrown when sensor container manager # kills the thread which runs process container. Not sure # if this is the best thing to do. @@ -180,8 +182,8 @@ def _poll_sensors_for_results(self, sensor_ids): # Try to respawn a dead process (maybe it was a simple failure which can be # resolved with a restart) - eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, - exit_code=status) + concurrency.spawn(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, + exit_code=status) else: sensor_start_time = self._sensor_start_times[sensor_id] sensor_respawn_count = self._sensor_respawn_counts[sensor_id] @@ -434,7 +436,7 @@ def _respawn_sensor(self, sensor_id, sensor, exit_code): self._sensor_respawn_counts[sensor_id] += 1 sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id]) - eventlet.sleep(sleep_delay) + concurrency.sleep(sleep_delay) try: self._spawn_sensor_process(sensor=sensor) diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index ae9c747b6f..c1d4c14ab1 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -23,11 +23,10 @@ import random import six -import eventlet -from eventlet.support import greenlets as greenlet from oslo_config import cfg from st2common import log as logging +from st2common.util import concurrency from st2common.constants.exit_codes import SUCCESS_EXIT_CODE from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2common.constants.garbage_collection import DEFAULT_COLLECTION_INTERVAL @@ -81,11 +80,13 @@ def run(self): # Wait a couple of seconds before performing initial collection to prevent thundering herd # effect when restarting multiple services at the same time jitter_seconds = random.uniform(0, 3) - eventlet.sleep(jitter_seconds) + concurrency.sleep(jitter_seconds) + + success_exception_cls = concurrency.get_greenlet_exit_exception_class() try: self._main_loop() - except greenlet.GreenletExit: + except success_exception_cls: self._running = False return SUCCESS_EXIT_CODE except Exception as e: @@ -111,7 +112,7 @@ def _main_loop(self): LOG.info('Sleeping for %s seconds before next garbage collection...' % (self._collection_interval)) - eventlet.sleep(self._collection_interval) + concurrency.sleep(self._collection_interval) def _validate_ttl_values(self): """ @@ -142,7 +143,7 @@ def _perform_garbage_collection(self): if self._action_executions_ttl and self._action_executions_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_action_executions() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -151,7 +152,7 @@ def _perform_garbage_collection(self): self._action_executions_output_ttl >= MINIMUM_TTL_DAYS_EXECUTION_OUTPUT: LOG.info(proc_message, obj_type) self._purge_action_executions_output() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -159,7 +160,7 @@ def _perform_garbage_collection(self): if self._trigger_instances_ttl and self._trigger_instances_ttl >= MINIMUM_TTL_DAYS: LOG.info(proc_message, obj_type) self._purge_trigger_instances() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -167,7 +168,7 @@ def _perform_garbage_collection(self): if self._purge_inquiries: LOG.info(proc_message, obj_type) self._timeout_inquiries() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) @@ -175,7 +176,7 @@ def _perform_garbage_collection(self): if self._workflow_execution_max_idle > 0: LOG.info(proc_message, obj_type) self._purge_orphaned_workflow_executions() - eventlet.sleep(self._sleep_delay) + concurrency.sleep(self._sleep_delay) else: LOG.debug(skip_message, obj_type) diff --git a/st2reactor/st2reactor/sensor/base.py b/st2reactor/st2reactor/sensor/base.py index 617ed565a6..f7350c06e3 100644 --- a/st2reactor/st2reactor/sensor/base.py +++ b/st2reactor/st2reactor/sensor/base.py @@ -13,10 +13,12 @@ # limitations under the License. from __future__ import absolute_import + import abc import six -import eventlet + +from st2common.util import concurrency __all__ = [ 'Sensor', @@ -117,7 +119,7 @@ def poll(self): def run(self): while True: self.poll() - eventlet.sleep(self._poll_interval) + concurrency.sleep(self._poll_interval) def get_poll_interval(self): """ From ee52f63bdb63786a9ebd6ff5719a91640feba0e7 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Thu, 19 Sep 2019 12:25:52 +0200 Subject: [PATCH 02/11] Add missing change. --- st2common/st2common/util/concurrency.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index 2735b8aa07..5314f280e8 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -121,3 +121,12 @@ def sleep(*args, **kwargs): return gevent.sleep(*args, **kwargs) else: raise ValueError('Unsupported concurrency library') + + +def get_greenlet_exit_exception_class(): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.support.greenlets.GreenletExit + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.GreenletExit + else: + raise ValueError('Unsupported concurrency library') From b1360e61ea72f34d9a9c6642e86ff681356fdc54 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Thu, 19 Sep 2019 20:14:02 +0200 Subject: [PATCH 03/11] Add a missing import. --- st2reactor/st2reactor/container/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/st2reactor/st2reactor/container/manager.py b/st2reactor/st2reactor/container/manager.py index b655305a7b..6942c78a81 100644 --- a/st2reactor/st2reactor/container/manager.py +++ b/st2reactor/st2reactor/container/manager.py @@ -19,6 +19,7 @@ import signal from st2common import log as logging +from st2common.util import concurrency from st2reactor.container.process_container import ProcessSensorContainer from st2common.services.sensor_watcher import SensorWatcher from st2common.models.system.common import ResourceReference From e8e8e4902989e53b437f560aef3b306ad7b5b684 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Thu, 19 Sep 2019 20:15:24 +0200 Subject: [PATCH 04/11] Add missing function to __all__. --- st2common/st2common/util/concurrency.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index 5314f280e8..c40d2c311d 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -42,7 +42,8 @@ 'wait', 'cancel', 'kill', - 'sleep' + 'sleep', + 'get_greenlet_exit_exception_class' ] From 1b343eefd43c370b7786e5e91bbffc4eae071c8e Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Thu, 19 Sep 2019 20:20:59 +0200 Subject: [PATCH 05/11] Also update test code in st2reactor to utilize our concurrency wrapper. --- .../integration/test_garbage_collector.py | 9 ++++---- .../tests/integration/test_rules_engine.py | 4 ++-- .../integration/test_sensor_container.py | 22 +++++++++---------- .../tests/integration/test_sensor_watcher.py | 5 +++-- .../tests/unit/test_process_container.py | 6 ++--- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/st2reactor/tests/integration/test_garbage_collector.py b/st2reactor/tests/integration/test_garbage_collector.py index d91d8016f2..f321025911 100644 --- a/st2reactor/tests/integration/test_garbage_collector.py +++ b/st2reactor/tests/integration/test_garbage_collector.py @@ -19,9 +19,7 @@ import signal import datetime -import eventlet -from eventlet.green import subprocess - +from st2common.util import concurrency from st2common.constants import action as action_constants from st2common.util import date as date_utils from st2common.models.db.execution import ActionExecutionDB @@ -187,7 +185,7 @@ def test_garbage_collection(self): process = self._start_garbage_collector() # Give it some time to perform garbage collection and kill it - eventlet.sleep(15) + concurrency.sleep(15) process.send_signal(signal.SIGKILL) self.remove_process(process=process) @@ -235,7 +233,7 @@ def test_inquiry_garbage_collection(self): process = self._start_garbage_collector() # Give it some time to perform garbage collection and kill it - eventlet.sleep(15) + concurrency.sleep(15) process.send_signal(signal.SIGKILL) self.remove_process(process=process) @@ -254,6 +252,7 @@ def _create_inquiry(self, ttl, timestamp): executions.create_execution_object(liveaction_db) def _start_garbage_collector(self): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(CMD_INQUIRY, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_rules_engine.py b/st2reactor/tests/integration/test_rules_engine.py index f61709e66c..ab7526925c 100644 --- a/st2reactor/tests/integration/test_rules_engine.py +++ b/st2reactor/tests/integration/test_rules_engine.py @@ -19,8 +19,7 @@ import signal import tempfile -from eventlet.green import subprocess - +from st2common.util import concurrency from st2common.constants.timer import TIMER_ENABLED_LOG_LINE from st2common.constants.timer import TIMER_DISABLED_LOG_LINE from st2tests.base import IntegrationTestCase @@ -134,6 +133,7 @@ def test_timer_disable_explicit(self): (TIMER_DISABLED_LOG_LINE)) def _start_times_engine(self, cmd): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_sensor_container.py b/st2reactor/tests/integration/test_sensor_container.py index 018a825ac1..d3e7c04d2f 100644 --- a/st2reactor/tests/integration/test_sensor_container.py +++ b/st2reactor/tests/integration/test_sensor_container.py @@ -19,11 +19,10 @@ import signal import psutil -import eventlet -from eventlet.green import subprocess from oslo_config import cfg import st2tests.config +from st2common.util import concurrency from st2common.models.db import db_setup from st2reactor.container.process_container import PROCESS_EXIT_TIMEOUT from st2common.util.green.shell import run_command @@ -94,7 +93,7 @@ def test_child_processes_are_killed_on_sigint(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(5) + concurrency.sleep(5) # Assert process has started and is running self.assertProcessIsRunning(process=process) @@ -110,7 +109,7 @@ def test_child_processes_are_killed_on_sigint(self): # SIGINT causes graceful shutdown so give it some time to gracefuly shut down the sensor # child processes - eventlet.sleep(PROCESS_EXIT_TIMEOUT + 1) + concurrency.sleep(PROCESS_EXIT_TIMEOUT + 1) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -122,7 +121,7 @@ def test_child_processes_are_killed_on_sigterm(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(3) + concurrency.sleep(3) # Verify container process and children sensor / wrapper processes are running pp = psutil.Process(process.pid) @@ -135,7 +134,7 @@ def test_child_processes_are_killed_on_sigterm(self): # SIGTERM causes graceful shutdown so give it some time to gracefuly shut down the sensor # child processes - eventlet.sleep(PROCESS_EXIT_TIMEOUT + 5) + concurrency.sleep(PROCESS_EXIT_TIMEOUT + 5) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -147,7 +146,7 @@ def test_child_processes_are_killed_on_sigkill(self): process = self._start_sensor_container() # Give it some time to start up - eventlet.sleep(4) + concurrency.sleep(4) # Verify container process and children sensor / wrapper processes are running pp = psutil.Process(process.pid) @@ -159,7 +158,7 @@ def test_child_processes_are_killed_on_sigkill(self): process.send_signal(signal.SIGKILL) # Note: On SIGKILL processes should be killed instantly - eventlet.sleep(1) + concurrency.sleep(1) # Verify parent and children processes have exited self.assertProcessExited(proc=pp) @@ -175,7 +174,7 @@ def test_single_sensor_mode(self): pp = psutil.Process(process.pid) # Give it some time to start up - eventlet.sleep(4) + concurrency.sleep(4) stdout = process.stdout.read() self.assertTrue((b'--sensor-ref argument must be provided when running in single sensor ' @@ -191,7 +190,7 @@ def test_single_sensor_mode(self): pp = psutil.Process(process.pid) # Give it some time to start up - eventlet.sleep(8) + concurrency.sleep(8) # Container should exit and not respawn a sensor in single sensor mode stdout = process.stdout.read() @@ -200,12 +199,13 @@ def test_single_sensor_mode(self): self.assertTrue(b'Not respawning a sensor since running in single sensor mode') self.assertTrue(b'Process container quit with exit_code 110.') - eventlet.sleep(2) + concurrency.sleep(2) self.assertProcessExited(proc=pp) self.remove_process(process=process) def _start_sensor_container(self, cmd=DEFAULT_CMD): + subprocess = concurrency.get_subprocess_module() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) self.add_process(process=process) diff --git a/st2reactor/tests/integration/test_sensor_watcher.py b/st2reactor/tests/integration/test_sensor_watcher.py index 59ec452ce5..7a33cbb97e 100644 --- a/st2reactor/tests/integration/test_sensor_watcher.py +++ b/st2reactor/tests/integration/test_sensor_watcher.py @@ -13,10 +13,11 @@ # limitations under the License. from __future__ import absolute_import -import eventlet + from monotonic import monotonic from pyrabbit.api import Client +from st2common.util import concurrency from st2common.services.sensor_watcher import SensorWatcher from st2tests.base import IntegrationTestCase @@ -50,7 +51,7 @@ def delete_handler(sensor_db): start = monotonic() done = False while not done: - eventlet.sleep(0.01) + concurrency.sleep(0.01) sw_queues = self._get_sensor_watcher_amqp_queues(queue_name='st2.sensor.watch.covfefe') done = len(sw_queues) > 0 or ((monotonic() - start) < 5) diff --git a/st2reactor/tests/unit/test_process_container.py b/st2reactor/tests/unit/test_process_container.py index f353a0719b..f36b5277ea 100644 --- a/st2reactor/tests/unit/test_process_container.py +++ b/st2reactor/tests/unit/test_process_container.py @@ -16,11 +16,11 @@ import os import time -import eventlet from mock import (MagicMock, Mock, patch) import unittest2 from st2reactor.container.process_container import ProcessSensorContainer +from st2common.util import concurrency from st2common.models.db.pack import PackDB from st2common.persistence.pack import Pack @@ -35,8 +35,8 @@ class ProcessContainerTests(unittest2.TestCase): def test_no_sensors_dont_quit(self): process_container = ProcessSensorContainer(None, poll_interval=0.1) - process_container_thread = eventlet.spawn(process_container.run) - eventlet.sleep(0.5) + process_container_thread = concurrency.spawn(process_container.run) + concurrency.sleep(0.5) self.assertEqual(process_container.running(), 0) self.assertEqual(process_container.stopped(), False) process_container.shutdown() From 6478e3d31ffc3a4ae5ad7a3eafad64c8875733ae Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Sep 2019 18:04:52 +0200 Subject: [PATCH 06/11] Update st2common.log.setup function so it takes new "st2_config_path" and supports logging config paths which are relative to st2.conf file directory. --- st2common/st2common/log.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/st2common/st2common/log.py b/st2common/st2common/log.py index cf1347ea81..15c25002a5 100644 --- a/st2common/st2common/log.py +++ b/st2common/st2common/log.py @@ -181,10 +181,22 @@ def _redirect_stderr(): sys.stderr = LoggingStream('STDERR') -def setup(config_file, redirect_stderr=True, excludes=None, disable_existing_loggers=False): +def setup(config_file, redirect_stderr=True, excludes=None, disable_existing_loggers=False, + st2_conf_path=None): """ Configure logging from file. + + :param st2_conf_path: Optional path to st2.conf file. If provided and "config_file" path is + relative to st2.conf path, the config_file path will get resolved to full + absolute path relative to st2.conf. + :type st2_conf_path: ``str`` """ + if st2_conf_path and config_file[:2] == './' and not os.path.isfile(config_file): + # Logging config path is relative to st2.conf, resolve it to full absolute path + directory = os.path.dirname(st2_conf_path) + config_file_name = os.path.basename(config_file) + config_file = os.path.join(directory, config_file_name) + try: logging.config.fileConfig(config_file, defaults=None, From b5780b8ac29fd0e82af5929e821ce23d3a07ee2a Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 24 Sep 2019 10:50:59 +0200 Subject: [PATCH 07/11] Allow user to pass "wrapper_script_path" and "create_token" arguments to the ProcessSensorContainer class. --- .../st2reactor/container/process_container.py | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/st2reactor/st2reactor/container/process_container.py b/st2reactor/st2reactor/container/process_container.py index 5eee551cf8..6ac36dd113 100644 --- a/st2reactor/st2reactor/container/process_container.py +++ b/st2reactor/st2reactor/container/process_container.py @@ -79,16 +79,26 @@ class ProcessSensorContainer(object): Sensor container which runs sensors in a separate process. """ - def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatcher=None): + def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatcher=None, + wrapper_script_path=WRAPPER_SCRIPT_PATH, create_token=True): """ :param sensors: A list of sensor dicts. :type sensors: ``list`` of ``dict`` :param poll_interval: How long to sleep between each poll for running / dead sensors. :type poll_interval: ``float`` + + :param wrapper_script_path: Path to the sensor wrapper script. + :type wrapper_script_path: ``str`` + + :param create_token: True to create temporary authentication token for the purpose for each + sensor process and add it to that process environment variables. + :type create_token: ``bool`` """ self._poll_interval = poll_interval self._single_sensor_mode = single_sensor_mode + self._wrapper_script_path = wrapper_script_path + self._create_token = create_token if self._single_sensor_mode: # For more immediate feedback we use lower poll interval when running in single sensor @@ -98,9 +108,7 @@ def __init__(self, sensors, poll_interval=5, single_sensor_mode=False, dispatche self._sensors = {} # maps sensor_id -> sensor object self._processes = {} # maps sensor_id -> sensor process - if not dispatcher: - dispatcher = TriggerDispatcher(LOG) - self._dispatcher = dispatcher + self._dispatcher = dispatcher or TriggerDispatcher(LOG) self._stopped = False self._exit_code = None # exit code with which this process should exit @@ -303,7 +311,7 @@ def _spawn_sensor_process(self, sensor): args = [ python_path, - WRAPPER_SCRIPT_PATH, + self._wrapper_script_path, '--pack=%s' % (sensor['pack']), '--file-path=%s' % (sensor['file_path']), '--class-name=%s' % (sensor['class_name']), @@ -329,22 +337,26 @@ def _spawn_sensor_process(self, sensor): else: env['PYTHONPATH'] = sandbox_python_path - # Include full api URL and API token specific to that sensor - ttl = cfg.CONF.auth.service_token_ttl - metadata = { - 'service': 'sensors_container', - 'sensor_path': sensor['file_path'], - 'sensor_class': sensor['class_name'] - } - temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata, - service=True) + if self._create_token: + # Include full api URL and API token specific to that sensor + LOG.debug('Creating temporary auth token for sensor %s' % (sensor['class_name'])) + + ttl = cfg.CONF.auth.service_token_ttl + metadata = { + 'service': 'sensors_container', + 'sensor_path': sensor['file_path'], + 'sensor_class': sensor['class_name'] + } + temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata, + service=True) + + env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() + env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token - env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() - env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token + # TODO 1: Purge temporary token when service stops or sensor process dies + # TODO 2: Store metadata (wrapper process id) with the token and delete + # tokens for old, dead processes on startup - # TODO 1: Purge temporary token when service stops or sensor process dies - # TODO 2: Store metadata (wrapper process id) with the token and delete - # tokens for old, dead processes on startup cmd = ' '.join(args) LOG.debug('Running sensor subprocess (cmd="%s")', cmd) From cebeccc3cd9da2f5f6b6a3a2975a13fdf4f9bf9c Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Tue, 24 Sep 2019 17:14:13 +0200 Subject: [PATCH 08/11] Move code for setting CLI arguments which are passed to the wrapper script to a method which can be overriden in derived classes. --- .../st2reactor/container/process_container.py | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/st2reactor/st2reactor/container/process_container.py b/st2reactor/st2reactor/container/process_container.py index 6ac36dd113..592447b811 100644 --- a/st2reactor/st2reactor/container/process_container.py +++ b/st2reactor/st2reactor/container/process_container.py @@ -304,26 +304,7 @@ def _spawn_sensor_process(self, sensor): msg = PACK_VIRTUALENV_USES_PYTHON3 % format_values raise Exception(msg) - trigger_type_refs = sensor['trigger_types'] or [] - trigger_type_refs = ','.join(trigger_type_refs) - - parent_args = json.dumps(sys.argv[1:]) - - args = [ - python_path, - self._wrapper_script_path, - '--pack=%s' % (sensor['pack']), - '--file-path=%s' % (sensor['file_path']), - '--class-name=%s' % (sensor['class_name']), - '--trigger-type-refs=%s' % (trigger_type_refs), - '--parent-args=%s' % (parent_args) - ] - - if sensor['poll_interval']: - args.append('--poll-interval=%s' % (sensor['poll_interval'])) - - sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True, - inherit_parent_virtualenv=True) + args = self._get_args_for_wrapper_script(python_binary=python_path, sensor=sensor) if self._enable_common_pack_libs: pack_common_libs_path = get_pack_common_libs_path_for_pack_ref(pack_ref=pack_ref) @@ -332,6 +313,9 @@ def _spawn_sensor_process(self, sensor): env = os.environ.copy() + sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True, + inherit_parent_virtualenv=True) + if self._enable_common_pack_libs and pack_common_libs_path: env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path else: @@ -473,6 +457,38 @@ def _should_respawn_sensor(self, sensor_id, sensor, exit_code): return True + def _get_args_for_wrapper_script(self, python_binary, sensor): + """ + Return CLI arguments passed to the sensor wrapper script. + + :param python_binary: Python binary used to execute wrapper script. + :type python_binary: ``str`` + + :param sensor: Sensor object dictionary. + :type sensor: ``dict`` + + :rtype: ``list`` + """ + trigger_type_refs = sensor['trigger_types'] or [] + trigger_type_refs = ','.join(trigger_type_refs) + + parent_args = json.dumps(sys.argv[1:]) + + args = [ + python_binary, + self._wrapper_script_path, + '--pack=%s' % (sensor['pack']), + '--file-path=%s' % (sensor['file_path']), + '--class-name=%s' % (sensor['class_name']), + '--trigger-type-refs=%s' % (trigger_type_refs), + '--parent-args=%s' % (parent_args) + ] + + if sensor['poll_interval']: + args.append('--poll-interval=%s' % (sensor['poll_interval'])) + + return args + def _get_sensor_id(self, sensor): """ Return unique identifier for the provider sensor dict. From 2bb212b0f6a977b99592e9a323ec0dd47ef722a7 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 21 Oct 2019 18:55:44 +0200 Subject: [PATCH 09/11] Allow user to specify additional options which are passed to "pip" command when installing pack dependencies into pack virtualenv using "pip" by seting action_runner.pip_opts config option. --- st2common/st2common/config.py | 4 ++++ st2common/st2common/util/virtualenvs.py | 13 ++++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 1b31b93e77..5ad2fe0971 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -374,6 +374,10 @@ def register_opts(ignore_errors=False): 'virtualenv_opts', default=['--system-site-packages'], help='List of virtualenv options to be passsed to "virtualenv" command that ' 'creates pack virtualenv.'), + cfg.ListOpt( + 'pip_opts', default=[], + help='List of pip options to be passed to "pip install" command when installing pack ' + 'dependencies into pack virtual environment.'), cfg.BoolOpt( 'stream_output', default=True, help='True to store and stream action output (stdout and stderr) in real-time.') diff --git a/st2common/st2common/util/virtualenvs.py b/st2common/st2common/util/virtualenvs.py index b1eacad31d..0ac1c5977c 100644 --- a/st2common/st2common/util/virtualenvs.py +++ b/st2common/st2common/util/virtualenvs.py @@ -158,7 +158,7 @@ def create_virtualenv(virtualenv_path, logger=None, include_pip=True, include_se python_binary = cfg.CONF.actionrunner.python_binary python3_binary = cfg.CONF.actionrunner.python3_binary virtualenv_binary = cfg.CONF.actionrunner.virtualenv_binary - virtualenv_opts = cfg.CONF.actionrunner.virtualenv_opts + virtualenv_opts = cfg.CONF.actionrunner.virtualenv_opts or [] if not os.path.isfile(python_binary): raise Exception('Python binary "%s" doesn\'t exist' % (python_binary)) @@ -237,6 +237,7 @@ def install_requirements(virtualenv_path, requirements_file_path, proxy_config=N """ logger = logger or LOG pip_path = os.path.join(virtualenv_path, 'bin/pip') + pip_opts = cfg.CONF.actionrunner.pip_opts or [] cmd = [pip_path] if proxy_config: @@ -253,7 +254,10 @@ def install_requirements(virtualenv_path, requirements_file_path, proxy_config=N if cert: cmd.extend(['--cert', cert]) - cmd.extend(['install', '-U', '-r', requirements_file_path]) + cmd.append('install') + cmd.extend(pip_opts) + cmd.extend(['-U', '-r', requirements_file_path]) + env = get_env_for_subprocess_command() logger.debug('Installing requirements from file %s with command %s.', @@ -278,6 +282,7 @@ def install_requirement(virtualenv_path, requirement, proxy_config=None, logger= """ logger = logger or LOG pip_path = os.path.join(virtualenv_path, 'bin/pip') + pip_opts = cfg.CONF.actionrunner.pip_opts or [] cmd = [pip_path] if proxy_config: @@ -294,7 +299,9 @@ def install_requirement(virtualenv_path, requirement, proxy_config=None, logger= if cert: cmd.extend(['--cert', cert]) - cmd.extend(['install', requirement]) + cmd.append('install') + cmd.extend(pip_opts) + cmd.extend([requirement]) env = get_env_for_subprocess_command() logger.debug('Installing requirement %s with command %s.', requirement, ' '.join(cmd)) From 0568f194a1bd3678debe8f952bc3260477e621b4 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 21 Oct 2019 19:11:04 +0200 Subject: [PATCH 10/11] Add changelog entry. --- CHANGELOG.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6d79ce87f8..729f24c59a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -28,6 +28,9 @@ Changed writing very large executions (executions with large results) to the database. #4767 * Improved development instructions in requirements.txt and dist_utils.py comment headers (improvement) #4774 +* Add new ``action_runner.pip_opts`` st2.conf config option which allows user to specify a list + of command line option which are passed to ``pip install`` command when installing pack + dependencies into a pack specific virtual environment. #4792 Fixed ~~~~~ From fba49352fe2fae4c976b955443721bd7f90c426e Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 21 Oct 2019 19:11:14 +0200 Subject: [PATCH 11/11] Re-generate sample config. --- conf/st2.conf.sample | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index d5d295d058..37985b34ae 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -8,6 +8,8 @@ enable = True emit_when = succeeded,failed,timeout,canceled,abandoned # comma separated list allowed here. [actionrunner] +# List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment. +pip_opts = # comma separated list allowed here. # Internal pool size for dispatcher used by regular actions. actions_pool_size = 60 # Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter.