Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion contrib/runners/python_runner/python_runner/python_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
from subprocess import list2cmdline

import six
from eventlet.green import subprocess

from oslo_config import cfg
from six.moves import StringIO

from st2common import log as logging
from st2common.runners.base import GitWorktreeActionRunner
from st2common.runners.base import get_metadata as get_runner_metadata
from st2common.util import concurrency
from st2common.util.green.shell import run_command
from st2common.constants.action import ACTION_OUTPUT_RESULT_DELIMITER
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
Expand Down Expand Up @@ -167,6 +168,8 @@ def run(self, action_parameters):
'--parent-args=%s' % (parent_args),
]

subprocess = concurrency.get_subprocess_module()

# If parameter size is larger than the maximum allowed by Linux kernel
# we need to swap to stdin to communicate parameters. This avoids a
# failure to fork the wrapper process when using large parameters.
Expand Down
4 changes: 3 additions & 1 deletion st2common/st2common/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import six
import yaml
from oslo_config import cfg
from eventlet.green import subprocess

from st2common import log as logging
from st2common.constants import action as action_constants
Expand All @@ -39,6 +38,9 @@
from st2common.util.api import get_full_public_api_url
from st2common.util.deprecation import deprecated
from st2common.util.green.shell import run_command
from st2common.util import concurrency

subprocess = concurrency.get_subprocess_module()

__all__ = [
'ActionRunner',
Expand Down
15 changes: 8 additions & 7 deletions st2common/st2common/runners/parallel_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

from __future__ import absolute_import

import json
import re
import os
import traceback

import eventlet
from paramiko.ssh_exception import SSHException

from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
Expand All @@ -28,6 +28,7 @@
from st2common.exceptions.ssh import NoHostsConnectedToException
import st2common.util.jsonify as jsonify
from st2common.util import ip_utils
from st2common.util import concurrency as concurrency_lib

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +66,7 @@ def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_materia
if not hosts:
raise Exception('Need an non-empty list of hosts to talk to.')

self._pool = eventlet.GreenPool(concurrency)
self._pool = concurrency_lib.get_green_pool_class()(concurrency)
self._hosts_client = {}
self._bad_hosts = {}
self._scan_interval = 0.1
Expand All @@ -88,12 +89,12 @@ def connect(self, raise_on_any_error=False):
results = {}

for host in self._hosts:
while not self._pool.free():
eventlet.sleep(self._scan_interval)
while not concurrency_lib.is_green_pool_free(self._pool):
concurrency_lib.sleep(self._scan_interval)
self._pool.spawn(self._connect, host=host, results=results,
raise_on_any_error=raise_on_any_error)

self._pool.waitall()
concurrency_lib.green_pool_wait_all(self._pool)

if self._successful_connects < 1:
# We definitely have to raise an exception in this case.
Expand Down Expand Up @@ -226,10 +227,10 @@ def _execute_in_pool(self, execute_method, **kwargs):

for host in self._hosts_client.keys():
while not self._pool.free():
eventlet.sleep(self._scan_interval)
concurrency_lib.sleep(self._scan_interval)
self._pool.spawn(execute_method, host=host, results=results, **kwargs)

self._pool.waitall()
concurrency_lib.green_pool_wait_all(self._pool)
return results

def _connect(self, host, results, raise_on_any_error=False):
Expand Down
8 changes: 5 additions & 3 deletions st2common/st2common/runners/paramiko_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
# limitations under the License.

from __future__ import absolute_import

import os
import posixpath
import time

import eventlet
from oslo_config import cfg
from six.moves import StringIO

import six

import paramiko
from paramiko.ssh_exception import SSHException

Expand All @@ -33,7 +35,7 @@
from st2common.util.misc import sanitize_output
from st2common.util.shell import quote_unix
from st2common.constants.runners import DEFAULT_SSH_PORT, REMOTE_RUNNER_PRIVATE_KEY_HEADER
import six
from st2common.util import concurrency

__all__ = [
'ParamikoSSHClient',
Expand Down Expand Up @@ -434,7 +436,7 @@ def run(self, cmd, timeout=None, quote=False, call_line_handler_func=False):
break

# Short sleep to prevent busy waiting
eventlet.sleep(self.SLEEP_DELAY)
concurrency.sleep(self.SLEEP_DELAY)
# print('Wait over. Channel must be ready for host: %s' % self.hostname)

# Receive the exit status code of the command we ran.
Expand Down
6 changes: 4 additions & 2 deletions st2common/st2common/runners/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def make_read_and_store_stream_func(execution_db, action_db, store_data_func):
"""
# NOTE: This import has intentionally been moved here to avoid massive performance overhead
# (1+ second) for other functions inside this module which don't need to use those imports.
import eventlet
from st2common.util import concurrency

greenlet_exit_exc_cls = concurrency.get_greenlet_exit_exception_class()

def read_and_store_stream(stream, buff):
try:
Expand All @@ -176,7 +178,7 @@ def read_and_store_stream(stream, buff):
except RuntimeError:
# process was terminated abruptly
pass
except eventlet.support.greenlets.GreenletExit:
except greenlet_exit_exc_cls:
# Green thread exited / was killed
pass

Expand Down
6 changes: 3 additions & 3 deletions st2common/st2common/services/sensor_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from __future__ import absolute_import

import six
import eventlet
from kombu.mixins import ConsumerMixin

from st2common import log as logging
from st2common.transport import reactor, publishers
from st2common.transport import utils as transport_utils
from st2common.util import concurrency
import st2common.util.queues as queue_utils

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +90,7 @@ def process_task(self, body, message):
def start(self):
try:
self.connection = transport_utils.get_connection()
self._updates_thread = eventlet.spawn(self.run)
self._updates_thread = concurrency.spawn(self.run)
except:
LOG.exception('Failed to start sensor_watcher.')
self.connection.release()
Expand All @@ -99,7 +99,7 @@ def stop(self):
LOG.debug('Shutting down sensor watcher.')
try:
if self._updates_thread:
self._updates_thread = eventlet.kill(self._updates_thread)
self._updates_thread = concurrency.kill(self._updates_thread)

if self.connection:
channel = self.connection.channel()
Expand Down
16 changes: 8 additions & 8 deletions st2common/st2common/services/triggerwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from __future__ import absolute_import

import six
import eventlet
from kombu.mixins import ConsumerMixin

from st2common import log as logging
from st2common.persistence.trigger import Trigger
from st2common.transport import reactor, publishers
from st2common.transport import utils as transport_utils
from st2common.util import concurrency
import st2common.util.queues as queue_utils

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,21 +104,21 @@ def process_task(self, body, message):
finally:
message.ack()

eventlet.sleep(self.sleep_interval)
concurrency.sleep(self.sleep_interval)

def start(self):
try:
self.connection = transport_utils.get_connection()
self._updates_thread = eventlet.spawn(self.run)
self._load_thread = eventlet.spawn(self._load_triggers_from_db)
self._updates_thread = concurrency.spawn(self.run)
self._load_thread = concurrency.spawn(self._load_triggers_from_db)
except:
LOG.exception('Failed to start watcher.')
self.connection.release()

def stop(self):
try:
self._updates_thread = eventlet.kill(self._updates_thread)
self._load_thread = eventlet.kill(self._load_thread)
self._updates_thread = concurrency.kill(self._updates_thread)
self._load_thread = concurrency.kill(self._load_thread)
finally:
self.connection.release()

Expand All @@ -129,11 +129,11 @@ def stop(self):
def on_consume_end(self, connection, channel):
super(TriggerWatcher, self).on_consume_end(connection=connection,
channel=channel)
eventlet.sleep(seconds=self.sleep_interval)
concurrency.sleep(seconds=self.sleep_interval)

def on_iteration(self):
super(TriggerWatcher, self).on_iteration()
eventlet.sleep(seconds=self.sleep_interval)
concurrency.sleep(seconds=self.sleep_interval)

def _load_triggers_from_db(self):
for trigger_type in self._trigger_types:
Expand Down
5 changes: 3 additions & 2 deletions st2common/st2common/transport/connection_retry_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from __future__ import absolute_import

import six
import eventlet

from st2common.util import concurrency

__all__ = ['ConnectionRetryWrapper', 'ClusterRetryContext']

Expand Down Expand Up @@ -141,7 +142,7 @@ def run(self, connection, wrapped_callback):
# -1, 0 and 1+ are handled properly by eventlet.sleep
self._logger.debug('Received RabbitMQ server error, sleeping for %s seconds '
'before retrying: %s' % (wait, six.text_type(e)))
eventlet.sleep(wait)
concurrency.sleep(wait)

connection.close()
# ensure_connection will automatically switch to an alternate. Other connections
Expand Down
4 changes: 2 additions & 2 deletions st2common/st2common/transport/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

from __future__ import absolute_import
import abc
import eventlet
import six

from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.util.greenpooldispatch import BufferedDispatcher
from st2common.util import concurrency

__all__ = [
'QueueConsumer',
Expand Down Expand Up @@ -169,7 +169,7 @@ def __init__(self, connection, queues):

def start(self, wait=False):
LOG.info('Starting %s...', self.__class__.__name__)
self._consumer_thread = eventlet.spawn(self._queue_consumer.run)
self._consumer_thread = concurrency.spawn(self._queue_consumer.run)

if wait:
self.wait()
Expand Down
43 changes: 42 additions & 1 deletion st2common/st2common/util/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

try:
import gevent # pylint: disable=import-error
import gevent.pool
except ImportError:
gevent = None

Expand All @@ -43,7 +44,12 @@
'cancel',
'kill',
'sleep',
'get_greenlet_exit_exception_class'

'get_greenlet_exit_exception_class',

'get_green_pool_class',
'is_green_pool_free',
'green_pool_wait_all'
]


Expand Down Expand Up @@ -131,3 +137,38 @@ def get_greenlet_exit_exception_class():
return gevent.GreenletExit
else:
raise ValueError('Unsupported concurrency library')


def get_green_pool_class():
if CONCURRENCY_LIBRARY == 'eventlet':
return eventlet.GreenPool
elif CONCURRENCY_LIBRARY == 'gevent':
return gevent.pool.Pool
else:
raise ValueError('Unsupported concurrency library')


def is_green_pool_free(pool):
"""
Return True if the provided green pool is free, False otherwise.
"""
if CONCURRENCY_LIBRARY == 'eventlet':
return pool.free()
elif CONCURRENCY_LIBRARY == 'gevent':
return not pool.full()
else:
raise ValueError('Unsupported concurrency library')


def green_pool_wait_all(pool):
"""
Wait for all the green threads in the pool to finish.
"""
if CONCURRENCY_LIBRARY == 'eventlet':
return pool.waitall()
elif CONCURRENCY_LIBRARY == 'gevent':
# NOTE: This mimicks eventlet.waitall() functionallity better than
# pool.join()
return all(gl.ready() for gl in pool.greenlets)
else:
raise ValueError('Unsupported concurrency library')
13 changes: 8 additions & 5 deletions st2common/st2common/util/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
# limitations under the License.

from __future__ import absolute_import

import os
import shlex
import signal
from subprocess import list2cmdline
from ctypes import cdll

import six
# NOTE: eventlet 0.19.0 removed support for sellect.poll() so we not only provide green version of
# subprocess functionality and run_command
from eventlet.green import subprocess

from st2common import log as logging
from st2common.util import concurrency

# NOTE: eventlet 0.19.0 removed support for sellect.poll() so we not only provide green version of
# subprocess functionality and run_command
subprocess = concurrency.get_subprocess_module()

__all__ = [
'run_command',
Expand Down Expand Up @@ -75,8 +78,8 @@ def run_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
if not env:
env = os.environ.copy()

process = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr,
env=env, cwd=cwd, shell=shell)
process = concurrency.subprocess_popen(args=cmd, stdin=stdin, stdout=stdout, stderr=stderr,
env=env, cwd=cwd, shell=shell)
stdout, stderr = process.communicate()
exit_code = process.returncode

Expand Down