From 43e66f4693f262cf962cb3d75191ab2c55c2fc0d Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Fri, 29 Nov 2019 21:04:03 +0100 Subject: [PATCH 1/2] Update more code in st2common to utilize st2common.util.concurrency wrapper instead of calling eventlet directly. --- .../python_runner/python_runner.py | 4 +- st2common/st2common/runners/base.py | 4 +- st2common/st2common/runners/parallel_ssh.py | 15 ++++--- st2common/st2common/runners/paramiko_ssh.py | 8 ++-- st2common/st2common/runners/utils.py | 6 ++- .../st2common/services/sensor_watcher.py | 6 +-- .../st2common/services/triggerwatcher.py | 16 +++---- .../transport/connection_retry_wrapper.py | 5 ++- st2common/st2common/transport/consumers.py | 4 +- st2common/st2common/util/concurrency.py | 43 ++++++++++++++++++- st2common/st2common/util/shell.py | 13 +++--- 11 files changed, 89 insertions(+), 35 deletions(-) diff --git a/contrib/runners/python_runner/python_runner/python_runner.py b/contrib/runners/python_runner/python_runner/python_runner.py index dae67f33e2..6d7cdea4c0 100644 --- a/contrib/runners/python_runner/python_runner/python_runner.py +++ b/contrib/runners/python_runner/python_runner/python_runner.py @@ -23,13 +23,14 @@ from subprocess import list2cmdline import six -from eventlet.green import subprocess + from oslo_config import cfg from six.moves import StringIO from st2common import log as logging from st2common.runners.base import GitWorktreeActionRunner from st2common.runners.base import get_metadata as get_runner_metadata +from st2common.util import concurrency from st2common.util.green.shell import run_command from st2common.constants.action import ACTION_OUTPUT_RESULT_DELIMITER from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED @@ -173,6 +174,7 @@ def run(self, action_parameters): stdin = None stdin_params = None if len(serialized_parameters) >= MAX_PARAM_LENGTH: + subprocess = concurrency.get_subprocess_module() stdin = subprocess.PIPE LOG.debug('Parameters are too big...changing to stdin') stdin_params = '{"parameters": %s}\n' % (serialized_parameters) diff --git a/st2common/st2common/runners/base.py b/st2common/st2common/runners/base.py index c4214fe5ca..ed4af53878 100644 --- a/st2common/st2common/runners/base.py +++ b/st2common/st2common/runners/base.py @@ -23,7 +23,6 @@ import six import yaml from oslo_config import cfg -from eventlet.green import subprocess from st2common import log as logging from st2common.constants import action as action_constants @@ -39,6 +38,9 @@ from st2common.util.api import get_full_public_api_url from st2common.util.deprecation import deprecated from st2common.util.green.shell import run_command +from st2common.util import concurrency + +subprocess = concurrency.get_subprocess_module() __all__ = [ 'ActionRunner', diff --git a/st2common/st2common/runners/parallel_ssh.py b/st2common/st2common/runners/parallel_ssh.py index 9ade2528a0..c83b4cad91 100644 --- a/st2common/st2common/runners/parallel_ssh.py +++ b/st2common/st2common/runners/parallel_ssh.py @@ -13,12 +13,12 @@ # limitations under the License. from __future__ import absolute_import + import json import re import os import traceback -import eventlet from paramiko.ssh_exception import SSHException from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE @@ -28,6 +28,7 @@ from st2common.exceptions.ssh import NoHostsConnectedToException import st2common.util.jsonify as jsonify from st2common.util import ip_utils +from st2common.util import concurrency as concurrency_lib LOG = logging.getLogger(__name__) @@ -65,7 +66,7 @@ def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_materia if not hosts: raise Exception('Need an non-empty list of hosts to talk to.') - self._pool = eventlet.GreenPool(concurrency) + self._pool = concurrency_lib.get_green_pool_class()(concurrency) self._hosts_client = {} self._bad_hosts = {} self._scan_interval = 0.1 @@ -88,12 +89,12 @@ def connect(self, raise_on_any_error=False): results = {} for host in self._hosts: - while not self._pool.free(): - eventlet.sleep(self._scan_interval) + while not concurrency_lib.is_green_pool_free(self._pool): + concurrency_lib.sleep(self._scan_interval) self._pool.spawn(self._connect, host=host, results=results, raise_on_any_error=raise_on_any_error) - self._pool.waitall() + concurrency_lib.green_pool_wait_all(self._pool) if self._successful_connects < 1: # We definitely have to raise an exception in this case. @@ -226,10 +227,10 @@ def _execute_in_pool(self, execute_method, **kwargs): for host in self._hosts_client.keys(): while not self._pool.free(): - eventlet.sleep(self._scan_interval) + concurrency_lib.sleep(self._scan_interval) self._pool.spawn(execute_method, host=host, results=results, **kwargs) - self._pool.waitall() + concurrency_lib.green_pool_wait_all(self._pool) return results def _connect(self, host, results, raise_on_any_error=False): diff --git a/st2common/st2common/runners/paramiko_ssh.py b/st2common/st2common/runners/paramiko_ssh.py index 9536b633b9..c489629fb3 100644 --- a/st2common/st2common/runners/paramiko_ssh.py +++ b/st2common/st2common/runners/paramiko_ssh.py @@ -13,14 +13,16 @@ # limitations under the License. from __future__ import absolute_import + import os import posixpath import time -import eventlet from oslo_config import cfg from six.moves import StringIO +import six + import paramiko from paramiko.ssh_exception import SSHException @@ -33,7 +35,7 @@ from st2common.util.misc import sanitize_output from st2common.util.shell import quote_unix from st2common.constants.runners import DEFAULT_SSH_PORT, REMOTE_RUNNER_PRIVATE_KEY_HEADER -import six +from st2common.util import concurrency __all__ = [ 'ParamikoSSHClient', @@ -434,7 +436,7 @@ def run(self, cmd, timeout=None, quote=False, call_line_handler_func=False): break # Short sleep to prevent busy waiting - eventlet.sleep(self.SLEEP_DELAY) + concurrency.sleep(self.SLEEP_DELAY) # print('Wait over. Channel must be ready for host: %s' % self.hostname) # Receive the exit status code of the command we ran. diff --git a/st2common/st2common/runners/utils.py b/st2common/st2common/runners/utils.py index 25877fb704..407a6288bc 100644 --- a/st2common/st2common/runners/utils.py +++ b/st2common/st2common/runners/utils.py @@ -153,7 +153,9 @@ def make_read_and_store_stream_func(execution_db, action_db, store_data_func): """ # NOTE: This import has intentionally been moved here to avoid massive performance overhead # (1+ second) for other functions inside this module which don't need to use those imports. - import eventlet + from st2common.util import concurrency + + greenlet_exit_exc_cls = concurrency.get_greenlet_exit_exception_class() def read_and_store_stream(stream, buff): try: @@ -176,7 +178,7 @@ def read_and_store_stream(stream, buff): except RuntimeError: # process was terminated abruptly pass - except eventlet.support.greenlets.GreenletExit: + except greenlet_exit_exc_cls: # Green thread exited / was killed pass diff --git a/st2common/st2common/services/sensor_watcher.py b/st2common/st2common/services/sensor_watcher.py index 33b1faedb7..33492acc07 100644 --- a/st2common/st2common/services/sensor_watcher.py +++ b/st2common/st2common/services/sensor_watcher.py @@ -19,12 +19,12 @@ from __future__ import absolute_import import six -import eventlet from kombu.mixins import ConsumerMixin from st2common import log as logging from st2common.transport import reactor, publishers from st2common.transport import utils as transport_utils +from st2common.util import concurrency import st2common.util.queues as queue_utils LOG = logging.getLogger(__name__) @@ -90,7 +90,7 @@ def process_task(self, body, message): def start(self): try: self.connection = transport_utils.get_connection() - self._updates_thread = eventlet.spawn(self.run) + self._updates_thread = concurrency.spawn(self.run) except: LOG.exception('Failed to start sensor_watcher.') self.connection.release() @@ -99,7 +99,7 @@ def stop(self): LOG.debug('Shutting down sensor watcher.') try: if self._updates_thread: - self._updates_thread = eventlet.kill(self._updates_thread) + self._updates_thread = concurrency.kill(self._updates_thread) if self.connection: channel = self.connection.channel() diff --git a/st2common/st2common/services/triggerwatcher.py b/st2common/st2common/services/triggerwatcher.py index 99d3dd402b..5137d1bae5 100644 --- a/st2common/st2common/services/triggerwatcher.py +++ b/st2common/st2common/services/triggerwatcher.py @@ -16,13 +16,13 @@ from __future__ import absolute_import import six -import eventlet from kombu.mixins import ConsumerMixin from st2common import log as logging from st2common.persistence.trigger import Trigger from st2common.transport import reactor, publishers from st2common.transport import utils as transport_utils +from st2common.util import concurrency import st2common.util.queues as queue_utils LOG = logging.getLogger(__name__) @@ -104,21 +104,21 @@ def process_task(self, body, message): finally: message.ack() - eventlet.sleep(self.sleep_interval) + concurrency.sleep(self.sleep_interval) def start(self): try: self.connection = transport_utils.get_connection() - self._updates_thread = eventlet.spawn(self.run) - self._load_thread = eventlet.spawn(self._load_triggers_from_db) + self._updates_thread = concurrency.spawn(self.run) + self._load_thread = concurrency.spawn(self._load_triggers_from_db) except: LOG.exception('Failed to start watcher.') self.connection.release() def stop(self): try: - self._updates_thread = eventlet.kill(self._updates_thread) - self._load_thread = eventlet.kill(self._load_thread) + self._updates_thread = concurrency.kill(self._updates_thread) + self._load_thread = concurrency.kill(self._load_thread) finally: self.connection.release() @@ -129,11 +129,11 @@ def stop(self): def on_consume_end(self, connection, channel): super(TriggerWatcher, self).on_consume_end(connection=connection, channel=channel) - eventlet.sleep(seconds=self.sleep_interval) + concurrency.sleep(seconds=self.sleep_interval) def on_iteration(self): super(TriggerWatcher, self).on_iteration() - eventlet.sleep(seconds=self.sleep_interval) + concurrency.sleep(seconds=self.sleep_interval) def _load_triggers_from_db(self): for trigger_type in self._trigger_types: diff --git a/st2common/st2common/transport/connection_retry_wrapper.py b/st2common/st2common/transport/connection_retry_wrapper.py index 0390525b81..4a0b3b1a46 100644 --- a/st2common/st2common/transport/connection_retry_wrapper.py +++ b/st2common/st2common/transport/connection_retry_wrapper.py @@ -15,7 +15,8 @@ from __future__ import absolute_import import six -import eventlet + +from st2common.util import concurrency __all__ = ['ConnectionRetryWrapper', 'ClusterRetryContext'] @@ -141,7 +142,7 @@ def run(self, connection, wrapped_callback): # -1, 0 and 1+ are handled properly by eventlet.sleep self._logger.debug('Received RabbitMQ server error, sleeping for %s seconds ' 'before retrying: %s' % (wait, six.text_type(e))) - eventlet.sleep(wait) + concurrency.sleep(wait) connection.close() # ensure_connection will automatically switch to an alternate. Other connections diff --git a/st2common/st2common/transport/consumers.py b/st2common/st2common/transport/consumers.py index edfaafe5d8..95d908a8b7 100644 --- a/st2common/st2common/transport/consumers.py +++ b/st2common/st2common/transport/consumers.py @@ -14,7 +14,6 @@ from __future__ import absolute_import import abc -import eventlet import six from kombu.mixins import ConsumerMixin @@ -22,6 +21,7 @@ from st2common import log as logging from st2common.util.greenpooldispatch import BufferedDispatcher +from st2common.util import concurrency __all__ = [ 'QueueConsumer', @@ -169,7 +169,7 @@ def __init__(self, connection, queues): def start(self, wait=False): LOG.info('Starting %s...', self.__class__.__name__) - self._consumer_thread = eventlet.spawn(self._queue_consumer.run) + self._consumer_thread = concurrency.spawn(self._queue_consumer.run) if wait: self.wait() diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index c40d2c311d..336c249824 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -26,6 +26,7 @@ try: import gevent # pylint: disable=import-error + import gevent.pool except ImportError: gevent = None @@ -43,7 +44,12 @@ 'cancel', 'kill', 'sleep', - 'get_greenlet_exit_exception_class' + + 'get_greenlet_exit_exception_class', + + 'get_green_pool_class', + 'is_green_pool_free', + 'green_pool_wait_all' ] @@ -131,3 +137,38 @@ def get_greenlet_exit_exception_class(): return gevent.GreenletExit else: raise ValueError('Unsupported concurrency library') + + +def get_green_pool_class(): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.GreenPool + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.pool.Pool + else: + raise ValueError('Unsupported concurrency library') + + +def is_green_pool_free(pool): + """ + Return True if the provided green pool is free, False otherwise. + """ + if CONCURRENCY_LIBRARY == 'eventlet': + return pool.free() + elif CONCURRENCY_LIBRARY == 'gevent': + return not pool.full() + else: + raise ValueError('Unsupported concurrency library') + + +def green_pool_wait_all(pool): + """ + Wait for all the green threads in the pool to finish. + """ + if CONCURRENCY_LIBRARY == 'eventlet': + return pool.waitall() + elif CONCURRENCY_LIBRARY == 'gevent': + # NOTE: This mimicks eventlet.waitall() functionallity better than + # pool.join() + return all(gl.ready() for gl in pool.greenlets) + else: + raise ValueError('Unsupported concurrency library') diff --git a/st2common/st2common/util/shell.py b/st2common/st2common/util/shell.py index 4f6a049f54..972b3e3691 100644 --- a/st2common/st2common/util/shell.py +++ b/st2common/st2common/util/shell.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import absolute_import + import os import shlex import signal @@ -20,11 +21,13 @@ from ctypes import cdll import six -# NOTE: eventlet 0.19.0 removed support for sellect.poll() so we not only provide green version of -# subprocess functionality and run_command -from eventlet.green import subprocess from st2common import log as logging +from st2common.util import concurrency + +# NOTE: eventlet 0.19.0 removed support for sellect.poll() so we not only provide green version of +# subprocess functionality and run_command +subprocess = concurrency.get_subprocess_module() __all__ = [ 'run_command', @@ -75,8 +78,8 @@ def run_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, if not env: env = os.environ.copy() - process = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr, - env=env, cwd=cwd, shell=shell) + process = concurrency.subprocess_popen(args=cmd, stdin=stdin, stdout=stdout, stderr=stderr, + env=env, cwd=cwd, shell=shell) stdout, stderr = process.communicate() exit_code = process.returncode From f0c5c2ee494a73abec6136d429a99e9eb0e10b24 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Fri, 29 Nov 2019 21:48:19 +0100 Subject: [PATCH 2/2] Fix lint. --- contrib/runners/python_runner/python_runner/python_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contrib/runners/python_runner/python_runner/python_runner.py b/contrib/runners/python_runner/python_runner/python_runner.py index 6d7cdea4c0..c55db2050c 100644 --- a/contrib/runners/python_runner/python_runner/python_runner.py +++ b/contrib/runners/python_runner/python_runner/python_runner.py @@ -168,13 +168,14 @@ def run(self, action_parameters): '--parent-args=%s' % (parent_args), ] + subprocess = concurrency.get_subprocess_module() + # If parameter size is larger than the maximum allowed by Linux kernel # we need to swap to stdin to communicate parameters. This avoids a # failure to fork the wrapper process when using large parameters. stdin = None stdin_params = None if len(serialized_parameters) >= MAX_PARAM_LENGTH: - subprocess = concurrency.get_subprocess_module() stdin = subprocess.PIPE LOG.debug('Parameters are too big...changing to stdin') stdin_params = '{"parameters": %s}\n' % (serialized_parameters)