diff --git a/.travis.yml b/.travis.yml index 42b794e571..61f50d8d2d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -67,19 +67,20 @@ addons: - mongodb-upstart - sourceline: 'deb [arch=amd64] http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.4 multiverse' key_url: 'https://www.mongodb.org/static/pgp/server-3.4.asc' + # NOTE: Precise repo doesn't contain Erlang 20.x, latest version is 19.x so we need to use RabbitMQ 3.7.6 + #- sourceline: 'deb [arch=amd64] http://packages.erlang-solutions.com/ubuntu precise contrib' + # key_url: 'https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc' + #- sourceline: 'deb [arch=amd64] https://dl.bintray.com/rabbitmq/debian precise rabbitmq-server-v3.6.x' + # key_url: 'https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc' - sourceline: 'ppa:git-core/ppa' packages: - mongodb-org-server - mongodb-org-shell - # NOTE: Uncomment below for Xenial - # -rabbitmq-server + - erlang + - rabbitmq-server - git - libffi-dev -# NOTE: Remove / comment services section below for Xenial -services: - - rabbitmq - cache: pip: true directories: @@ -105,22 +106,27 @@ install: # Let's enable rabbitmqadmin # See https://github.com/messagebus/lapine/wiki/Testing-on-Travis. before_script: - # key_url no longer works for APT addon # Use a custom mongod.conf which uses various speed optimizations - sudo cp scripts/travis/mongod.conf /etc/mongod.conf # Clean up any old MongoDB 3.4 data files laying around and make sure mongodb user can write to it - sudo rm -rf /var/lib/mongodb ; sudo mkdir /var/lib/mongodb ; sudo chown -R mongodb:mongodb /var/lib/mongodb - sudo service mongod restart ; sleep 5 - sudo service mongod status - - sudo tail -30 /var/log/mongodb/mongod.log - - mongod --version - - git --version - - pip --version - - virtualenv --version + - sudo tail -n 30 /var/log/mongodb/mongod.log + # Use custom RabbitMQ config which enables SSL / TLS listener on port 5671 with test certs + - sudo cp scripts/travis/rabbitmq.config /etc/rabbitmq/rabbitmq.config + # Install rabbitmq_management RabbitMQ plugin + - sudo service rabbitmq-server restart ; sleep 5 - sudo rabbitmq-plugins enable rabbitmq_management - sudo wget http://guest:guest@localhost:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin - sudo chmod +x /usr/local/bin/rabbitmqadmin - sudo service rabbitmq-server restart + - sudo tail -n 30 /var/log/rabbitmq/* + # Print various binary versions + - mongod --version + - git --version + - pip --version + - virtualenv --version # Print out various environment variables info - make play diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d4d63da77a..7fb1e76a65 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,15 @@ Added For backward compatibility reasons, if pack metadata file doesn't contain that attribute, it's assumed it only works with Python 2. (new feature) #4474 +* Add support for various new SSL / TLS related config options (``ssl_keyfile``, ``ssl_certfile``, + ``ssl_ca_certs``, ``ssl_certfile``, ``authentication_mechanism``) to the ``messaging`` section in + ``st2.conf`` config file. + + With those config options, user can configure things such as client based certificate + authentication, client side verification of a server certificate against a specific CA bundle, etc. + + NOTE: Those options are only supported when using a default and officially supported AMQP backend + with RabbitMQ server. (new feature) #4541 Changed ~~~~~~~ diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index d5215206e1..21db1278af 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -171,14 +171,26 @@ mask_secrets_blacklist = # comma separated list allowed here. mask_secrets = True [messaging] -# URL of the messaging server. -url = amqp://guest:guest@127.0.0.1:5672// -# How long should we wait between connection retries. -connection_retry_wait = 10000 +# Certificate file used to identify the local connection (client). +ssl_certfile = None # How many times should we retry connection before failing. connection_retries = 10 +# Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string. +ssl = False +# URL of the messaging server. +url = amqp://guest:guest@127.0.0.1:5672// +# Specifies whether a certificate is required from the other side of the connection, and whether it will be validated if provided. +ssl_cert_reqs = None # URL of all the nodes in a messaging service cluster. cluster_urls = # comma separated list allowed here. +# How long should we wait between connection retries. +connection_retry_wait = 10000 +# Private keyfile used to identify the local connection against RabbitMQ. +ssl_keyfile = None +# ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ. +ssl_ca_certs = None +# Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.). +login_method = None [metrics] # Randomly sample and only send metrics for X% of metric operations to the backend. Default value of 1 means no sampling is done and all the metrics are sent to the backend. E.g. 0.1 would mean 10% of operations are sampled. diff --git a/conf/st2.dev.conf b/conf/st2.dev.conf index c266534710..52f9ac61dc 100644 --- a/conf/st2.dev.conf +++ b/conf/st2.dev.conf @@ -91,7 +91,14 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa [messaging] url = amqp://guest:guest@127.0.0.1:5672/ -#url = redis://localhost:6379/0 +# Uncomment to test SSL options +#url = amqp://guest:guest@127.0.0.1:5671/ +#ssl = True +#ssl_keyfile = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/client/private_key.pem +#ssl_certfile = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.pem +#ssl_ca_certs = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem +#ssl_cert_reqs = required +#ssl_cert_reqs = required [ssh_runner] remote_dir = /tmp diff --git a/scripts/travis/rabbitmq.config b/scripts/travis/rabbitmq.config new file mode 100644 index 0000000000..0cf25a732a --- /dev/null +++ b/scripts/travis/rabbitmq.config @@ -0,0 +1,11 @@ +[ + {rabbit, [ + {ssl_listeners, [5671]}, + {ssl_allow_poodle_attack, true}, + {ssl_options, [{cacertfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem"}, + {certfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.pem"}, + {keyfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/server/private_key.pem"}, + {verify, verify_peer}, + {fail_if_no_peer_cert, false}]} + ]} +]. diff --git a/st2actions/st2actions/notifier/notifier.py b/st2actions/st2actions/notifier/notifier.py index b41fb96d08..7cb0a3466d 100644 --- a/st2actions/st2actions/notifier/notifier.py +++ b/st2actions/st2actions/notifier/notifier.py @@ -14,10 +14,10 @@ # limitations under the License. from __future__ import absolute_import + from datetime import datetime import json -from kombu import Connection from oslo_config import cfg from st2common import log as logging @@ -268,6 +268,6 @@ def _get_runner_ref(self, action_ref): def get_notifier(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return Notifier(conn, [NOTIFIER_ACTIONUPDATE_WORK_QUEUE], trigger_dispatcher=TriggerDispatcher(LOG)) diff --git a/st2actions/st2actions/resultstracker/resultstracker.py b/st2actions/st2actions/resultstracker/resultstracker.py index 954b1da42e..62f6e746ff 100644 --- a/st2actions/st2actions/resultstracker/resultstracker.py +++ b/st2actions/st2actions/resultstracker/resultstracker.py @@ -14,11 +14,11 @@ # limitations under the License. from __future__ import absolute_import + import eventlet import six from collections import defaultdict -from kombu import Connection from st2common.query.base import QueryContext from st2common import log as logging @@ -111,5 +111,5 @@ def get_querier(self, query_module_name): def get_tracker(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE]) diff --git a/st2actions/st2actions/scheduler/entrypoint.py b/st2actions/st2actions/scheduler/entrypoint.py index 752d6849ea..811a1f7d80 100644 --- a/st2actions/st2actions/scheduler/entrypoint.py +++ b/st2actions/st2actions/scheduler/entrypoint.py @@ -14,7 +14,6 @@ # limitations under the License. from __future__ import absolute_import -from kombu import Connection from st2common import log as logging from st2common.util import date @@ -105,5 +104,5 @@ def _create_execution_queue_item_db_from_liveaction(self, liveaction, delay=None def get_scheduler_entrypoint(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return SchedulerEntrypoint(conn, [ACTIONSCHEDULER_REQUEST_QUEUE]) diff --git a/st2actions/st2actions/worker.py b/st2actions/st2actions/worker.py index fdb39f0ab5..e36eb0489a 100644 --- a/st2actions/st2actions/worker.py +++ b/st2actions/st2actions/worker.py @@ -17,8 +17,6 @@ import sys import traceback -from kombu import Connection - from st2actions.container.base import RunnerContainer from st2common import log as logging from st2common.constants import action as action_constants @@ -250,5 +248,5 @@ def _resume_action(self, liveaction_db): def get_worker(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES) diff --git a/st2actions/tests/integration/test_action_state_consumer.py b/st2actions/tests/integration/test_action_state_consumer.py index 668ac5c339..3061677ef0 100644 --- a/st2actions/tests/integration/test_action_state_consumer.py +++ b/st2actions/tests/integration/test_action_state_consumer.py @@ -20,8 +20,6 @@ import mock -from kombu import Connection - from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE from st2actions.resultstracker.resultstracker import ResultsTracker from st2common.models.db.executionstate import ActionExecutionStateDB @@ -63,7 +61,7 @@ def setUpClass(cls): @mock.patch.object(TestQuerier, 'query', mock.MagicMock(return_value=(False, {}))) def test_process_message(self): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: tracker = ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE]) tracker._bootstrap() state = ActionStateConsumerTests.get_state( diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 0988d1683f..789a32c952 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -226,7 +226,28 @@ def register_opts(ignore_errors=False): help='How many times should we retry connection before failing.'), cfg.IntOpt( 'connection_retry_wait', default=10000, - help='How long should we wait between connection retries.') + help='How long should we wait between connection retries.'), + cfg.BoolOpt( + 'ssl', default=False, + help='Use SSL / TLS to connect to the messaging server. Same as ' + 'appending "?ssl=true" at the end of the connection URL string.'), + cfg.StrOpt( + 'ssl_keyfile', default=None, + help='Private keyfile used to identify the local connection against RabbitMQ.'), + cfg.StrOpt( + 'ssl_certfile', default=None, + help='Certificate file used to identify the local connection (client).'), + cfg.StrOpt( + 'ssl_cert_reqs', default=None, choices='none, optional, required', + help='Specifies whether a certificate is required from the other side of the ' + 'connection, and whether it will be validated if provided.'), + cfg.StrOpt( + 'ssl_ca_certs', default=None, + help='ca_certs file contains a set of concatenated CA certificates, which are ' + 'used to validate certificates passed from RabbitMQ.'), + cfg.StrOpt( + 'login_method', default=None, + help='Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).') ] do_register_opts(messaging_opts, 'messaging', ignore_errors) diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 2daffd4697..04e0b3127f 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -306,11 +306,11 @@ def _get_ssl_kwargs(ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_req ssl_kwargs['ssl'] = True ssl_kwargs['ssl_certfile'] = ssl_certfile if ssl_cert_reqs: - if ssl_cert_reqs is 'none': + if ssl_cert_reqs == 'none': ssl_cert_reqs = ssl_lib.CERT_NONE - elif ssl_cert_reqs is 'optional': + elif ssl_cert_reqs == 'optional': ssl_cert_reqs = ssl_lib.CERT_OPTIONAL - elif ssl_cert_reqs is 'required': + elif ssl_cert_reqs == 'required': ssl_cert_reqs = ssl_lib.CERT_REQUIRED ssl_kwargs['ssl_cert_reqs'] = ssl_cert_reqs if ssl_ca_certs: diff --git a/st2common/st2common/persistence/execution.py b/st2common/st2common/persistence/execution.py index 4df940566f..83d5a32288 100644 --- a/st2common/st2common/persistence/execution.py +++ b/st2common/st2common/persistence/execution.py @@ -19,7 +19,6 @@ from st2common.models.db.execution import ActionExecutionDB from st2common.models.db.execution import ActionExecutionOutputDB from st2common.persistence.base import Access -from st2common.transport import utils as transport_utils __all__ = [ 'ActionExecution', @@ -38,8 +37,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.execution.ActionExecutionPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.execution.ActionExecutionPublisher() return cls.publisher @classmethod @@ -57,8 +55,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.execution.ActionExecutionOutputPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.execution.ActionExecutionOutputPublisher() return cls.publisher @classmethod diff --git a/st2common/st2common/persistence/executionstate.py b/st2common/st2common/persistence/executionstate.py index 5f087e4647..f636a0008d 100644 --- a/st2common/st2common/persistence/executionstate.py +++ b/st2common/st2common/persistence/executionstate.py @@ -14,10 +14,14 @@ # limitations under the License. from __future__ import absolute_import + from st2common import transport from st2common.models.db.executionstate import actionexecstate_access from st2common.persistence import base as persistence -from st2common.transport import utils as transport_utils + +__all__ = [ + 'ActionExecutionState' +] class ActionExecutionState(persistence.Access): @@ -31,6 +35,5 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher() return cls.publisher diff --git a/st2common/st2common/persistence/liveaction.py b/st2common/st2common/persistence/liveaction.py index 88ddcaad59..2eb6015e65 100644 --- a/st2common/st2common/persistence/liveaction.py +++ b/st2common/st2common/persistence/liveaction.py @@ -14,10 +14,14 @@ # limitations under the License. from __future__ import absolute_import + from st2common import transport from st2common.models.db.liveaction import liveaction_access from st2common.persistence import base as persistence -from st2common.transport import utils as transport_utils + +__all__ = [ + 'LiveAction' +] class LiveAction(persistence.StatusBasedResource): @@ -31,8 +35,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.liveaction.LiveActionPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.liveaction.LiveActionPublisher() return cls.publisher @classmethod diff --git a/st2common/st2common/persistence/sensor.py b/st2common/st2common/persistence/sensor.py index c7547bcf61..e941c2bca5 100644 --- a/st2common/st2common/persistence/sensor.py +++ b/st2common/st2common/persistence/sensor.py @@ -14,10 +14,14 @@ # limitations under the License. from __future__ import absolute_import + from st2common import transport from st2common.models.db.sensor import sensor_type_access from st2common.persistence.base import ContentPackResource -from st2common.transport import utils as transport_utils + +__all__ = [ + 'SensorType' +] class SensorType(ContentPackResource): @@ -31,6 +35,5 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.reactor.SensorCUDPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.reactor.SensorCUDPublisher() return cls.publisher diff --git a/st2common/st2common/persistence/trigger.py b/st2common/st2common/persistence/trigger.py index bbe207c263..01787ce38d 100644 --- a/st2common/st2common/persistence/trigger.py +++ b/st2common/st2common/persistence/trigger.py @@ -14,12 +14,18 @@ # limitations under the License. from __future__ import absolute_import + from st2common import log as logging from st2common import transport from st2common.exceptions.db import StackStormDBObjectNotFoundError from st2common.models.db.trigger import triggertype_access, trigger_access, triggerinstance_access from st2common.persistence.base import (Access, ContentPackResource) -from st2common.transport import utils as transport_utils + +__all__ = [ + 'TriggerType', + 'Trigger', + 'TriggerInstance' +] LOG = logging.getLogger(__name__) @@ -43,8 +49,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.reactor.TriggerCUDPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.reactor.TriggerCUDPublisher() return cls.publisher @classmethod diff --git a/st2common/st2common/persistence/workflow.py b/st2common/st2common/persistence/workflow.py index 3063dd4a9d..933460b9aa 100644 --- a/st2common/st2common/persistence/workflow.py +++ b/st2common/st2common/persistence/workflow.py @@ -19,7 +19,6 @@ from st2common.models import db from st2common.models.db import workflow as wf_db_models from st2common.persistence import base as persistence -from st2common.transport import utils as transport_utils __all__ = [ @@ -39,8 +38,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = transport.workflow.WorkflowExecutionPublisher( - urls=transport_utils.get_messaging_urls()) + cls.publisher = transport.workflow.WorkflowExecutionPublisher() return cls.publisher diff --git a/st2common/st2common/services/sensor_watcher.py b/st2common/st2common/services/sensor_watcher.py index 41bcecc022..27892f055e 100644 --- a/st2common/st2common/services/sensor_watcher.py +++ b/st2common/st2common/services/sensor_watcher.py @@ -20,7 +20,6 @@ from __future__ import absolute_import import eventlet from kombu.mixins import ConsumerMixin -from kombu import Connection from st2common import log as logging from st2common.transport import reactor, publishers @@ -89,7 +88,7 @@ def process_task(self, body, message): def start(self): try: - self.connection = Connection(transport_utils.get_messaging_urls()) + self.connection = transport_utils.get_connection() self._updates_thread = eventlet.spawn(self.run) except: LOG.exception('Failed to start sensor_watcher.') diff --git a/st2common/st2common/services/triggerwatcher.py b/st2common/st2common/services/triggerwatcher.py index 787f85c489..2cbe5839fc 100644 --- a/st2common/st2common/services/triggerwatcher.py +++ b/st2common/st2common/services/triggerwatcher.py @@ -15,9 +15,9 @@ # pylint: disable=assignment-from-none from __future__ import absolute_import + import eventlet from kombu.mixins import ConsumerMixin -from kombu import Connection from st2common import log as logging from st2common.persistence.trigger import Trigger @@ -108,7 +108,7 @@ def process_task(self, body, message): def start(self): try: - self.connection = Connection(transport_utils.get_messaging_urls()) + self.connection = transport_utils.get_connection() self._updates_thread = eventlet.spawn(self.run) self._load_thread = eventlet.spawn(self._load_triggers_from_db) except: diff --git a/st2common/st2common/stream/listener.py b/st2common/st2common/stream/listener.py index 73b4962524..e5b5529f24 100644 --- a/st2common/st2common/stream/listener.py +++ b/st2common/st2common/stream/listener.py @@ -18,7 +18,6 @@ import eventlet -from kombu import Connection from kombu.mixins import ConsumerMixin from oslo_config import cfg @@ -233,13 +232,13 @@ def get_listener(name): if name == 'stream': if not _stream_listener: - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: _stream_listener = StreamListener(conn) eventlet.spawn_n(listen, _stream_listener) return _stream_listener elif name == 'execution_output': if not _execution_output_listener: - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: _execution_output_listener = ExecutionOutputListener(conn) eventlet.spawn_n(listen, _execution_output_listener) return _execution_output_listener diff --git a/st2common/st2common/transport/actionexecutionstate.py b/st2common/st2common/transport/actionexecutionstate.py index a46d202f4f..87523930f0 100644 --- a/st2common/st2common/transport/actionexecutionstate.py +++ b/st2common/st2common/transport/actionexecutionstate.py @@ -16,17 +16,23 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import + from kombu import Exchange, Queue + from st2common.transport import publishers +__all__ = [ + 'ActionExecutionStatePublisher' +] + ACTIONEXECUTIONSTATE_XCHG = Exchange('st2.actionexecutionstate', type='topic') class ActionExecutionStatePublisher(publishers.CUDPublisher): - def __init__(self, urls): - super(ActionExecutionStatePublisher, self).__init__(urls, ACTIONEXECUTIONSTATE_XCHG) + def __init__(self): + super(ActionExecutionStatePublisher, self).__init__(exchange=ACTIONEXECUTIONSTATE_XCHG) def get_queue(name, routing_key): diff --git a/st2common/st2common/transport/announcement.py b/st2common/st2common/transport/announcement.py index 72504806af..4f9d69390a 100644 --- a/st2common/st2common/transport/announcement.py +++ b/st2common/st2common/transport/announcement.py @@ -14,13 +14,20 @@ # limitations under the License. from __future__ import absolute_import + from kombu import Exchange, Queue from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers -from st2common.transport import utils as transport_utils + +__all__ = [ + 'AnnouncementPublisher', + 'AnnouncementDispatcher', + + 'get_queue' +] LOG = logging.getLogger(__name__) @@ -29,8 +36,8 @@ class AnnouncementPublisher(object): - def __init__(self, urls): - self._publisher = publishers.PoolPublisher(urls=urls) + def __init__(self): + self._publisher = publishers.PoolPublisher() def publish(self, payload, routing_key): self._publisher.publish(payload, ANNOUNCEMENT_XCHG, routing_key) @@ -42,7 +49,7 @@ class AnnouncementDispatcher(object): """ def __init__(self, logger=LOG): - self._publisher = AnnouncementPublisher(urls=transport_utils.get_messaging_urls()) + self._publisher = AnnouncementPublisher() self._logger = logger def dispatch(self, routing_key, payload, trace_context=None): diff --git a/st2common/st2common/transport/bootstrap_utils.py b/st2common/st2common/transport/bootstrap_utils.py index 97acc622d8..eda88f2146 100644 --- a/st2common/st2common/transport/bootstrap_utils.py +++ b/st2common/st2common/transport/bootstrap_utils.py @@ -20,7 +20,6 @@ import six import retrying from oslo_config import cfg -from kombu import Connection from kombu.serialization import register from kombu.serialization import pickle from kombu.serialization import pickle_protocol @@ -141,7 +140,8 @@ def _do_predeclare_queue(channel, queue): def register_exchanges(): LOG.debug('Registering exchanges...') connection_urls = transport_utils.get_messaging_urls() - with Connection(connection_urls) as conn: + + with transport_utils.get_connection() as conn: # Use ConnectionRetryWrapper to deal with rmq clustering etc. retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG) diff --git a/st2common/st2common/transport/execution.py b/st2common/st2common/transport/execution.py index 0256573e93..1885a72368 100644 --- a/st2common/st2common/transport/execution.py +++ b/st2common/st2common/transport/execution.py @@ -32,13 +32,13 @@ class ActionExecutionPublisher(publishers.CUDPublisher): - def __init__(self, urls): - super(ActionExecutionPublisher, self).__init__(urls, EXECUTION_XCHG) + def __init__(self): + super(ActionExecutionPublisher, self).__init__(exchange=EXECUTION_XCHG) class ActionExecutionOutputPublisher(publishers.CUDPublisher): - def __init__(self, urls): - super(ActionExecutionOutputPublisher, self).__init__(urls, EXECUTION_OUTPUT_XCHG) + def __init__(self): + super(ActionExecutionOutputPublisher, self).__init__(exchange=EXECUTION_OUTPUT_XCHG) def get_queue(name=None, routing_key=None, exclusive=False, auto_delete=False): diff --git a/st2common/st2common/transport/liveaction.py b/st2common/st2common/transport/liveaction.py index 996d57d5b4..b2b7efe238 100644 --- a/st2common/st2common/transport/liveaction.py +++ b/st2common/st2common/transport/liveaction.py @@ -16,9 +16,18 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import + from kombu import Exchange, Queue + from st2common.transport import publishers +__all__ = [ + 'LiveActionPublisher', + + 'get_queue', + 'get_status_management_queue' +] + LIVEACTION_XCHG = Exchange('st2.liveaction', type='topic') LIVEACTION_STATUS_MGMT_XCHG = Exchange('st2.liveaction.status', type='topic') @@ -26,9 +35,9 @@ class LiveActionPublisher(publishers.CUDPublisher, publishers.StatePublisherMixin): - def __init__(self, urls): - publishers.CUDPublisher.__init__(self, urls, LIVEACTION_XCHG) - publishers.StatePublisherMixin.__init__(self, urls, LIVEACTION_STATUS_MGMT_XCHG) + def __init__(self): + publishers.CUDPublisher.__init__(self, exchange=LIVEACTION_XCHG) + publishers.StatePublisherMixin.__init__(self, exchange=LIVEACTION_STATUS_MGMT_XCHG) def get_queue(name, routing_key): diff --git a/st2common/st2common/transport/publishers.py b/st2common/st2common/transport/publishers.py index 76f309bc55..010c24ed39 100644 --- a/st2common/st2common/transport/publishers.py +++ b/st2common/st2common/transport/publishers.py @@ -14,15 +14,23 @@ # limitations under the License. from __future__ import absolute_import + import copy -from kombu import Connection from kombu.messaging import Producer from st2common import log as logging from st2common.metrics.base import Timer +from st2common.transport import utils as transport_utils from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper +__all__ = [ + 'PoolPublisher', + 'SharedPoolPublishers', + 'CUDPublisher', + 'StatePublisherMixin' +] + ANY_RK = '*' CREATE_RK = 'create' UPDATE_RK = 'update' @@ -32,8 +40,17 @@ class PoolPublisher(object): - def __init__(self, urls): - self.pool = Connection(urls, failover_strategy='round-robin').Pool(limit=10) + def __init__(self, urls=None): + """ + :param urls: Connection URLs to use. If not provided it uses a default value from th + config. + :type urls: ``list`` + """ + urls = urls or transport_utils.get_messaging_urls() + connection = transport_utils.get_connection(urls=urls, + connection_kwargs={'failover_strategy': + 'round-robin'}) + self.pool = connection.Pool(limit=10) self.cluster_size = len(urls) def errback(self, exc, interval): @@ -92,7 +109,8 @@ def get_publisher(self, urls): class CUDPublisher(object): - def __init__(self, urls, exchange): + def __init__(self, exchange): + urls = transport_utils.get_messaging_urls() self._publisher = SharedPoolPublishers().get_publisher(urls=urls) self._exchange = exchange @@ -110,7 +128,8 @@ def publish_delete(self, payload): class StatePublisherMixin(object): - def __init__(self, urls, exchange): + def __init__(self, exchange): + urls = transport_utils.get_messaging_urls() self._state_publisher = SharedPoolPublishers().get_publisher(urls=urls) self._state_exchange = exchange diff --git a/st2common/st2common/transport/reactor.py b/st2common/st2common/transport/reactor.py index 65670c8ffe..944407b413 100644 --- a/st2common/st2common/transport/reactor.py +++ b/st2common/st2common/transport/reactor.py @@ -20,7 +20,6 @@ from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers -from st2common.transport import utils as transport_utils __all__ = [ 'TriggerCUDPublisher', @@ -50,8 +49,8 @@ class SensorCUDPublisher(publishers.CUDPublisher): Publisher responsible for publishing Trigger model CUD events. """ - def __init__(self, urls): - super(SensorCUDPublisher, self).__init__(urls, SENSOR_CUD_XCHG) + def __init__(self): + super(SensorCUDPublisher, self).__init__(exchange=SENSOR_CUD_XCHG) class TriggerCUDPublisher(publishers.CUDPublisher): @@ -59,13 +58,13 @@ class TriggerCUDPublisher(publishers.CUDPublisher): Publisher responsible for publishing Trigger model CUD events. """ - def __init__(self, urls): - super(TriggerCUDPublisher, self).__init__(urls, TRIGGER_CUD_XCHG) + def __init__(self): + super(TriggerCUDPublisher, self).__init__(exchange=TRIGGER_CUD_XCHG) class TriggerInstancePublisher(object): - def __init__(self, urls): - self._publisher = publishers.PoolPublisher(urls=urls) + def __init__(self): + self._publisher = publishers.PoolPublisher() def publish_trigger(self, payload=None, routing_key=None): # TODO: We should use trigger reference as a routing key @@ -78,7 +77,7 @@ class TriggerDispatcher(object): """ def __init__(self, logger=LOG): - self._publisher = TriggerInstancePublisher(urls=transport_utils.get_messaging_urls()) + self._publisher = TriggerInstancePublisher() self._logger = logger def dispatch(self, trigger, payload=None, trace_context=None): diff --git a/st2common/st2common/transport/utils.py b/st2common/st2common/transport/utils.py index c416578376..f71e91cc66 100644 --- a/st2common/st2common/transport/utils.py +++ b/st2common/st2common/transport/utils.py @@ -14,13 +14,21 @@ # limitations under the License. from __future__ import absolute_import + +import ssl as ssl_lib + from oslo_config import cfg +from kombu import Connection + +from st2common import log as logging __all__ = [ + 'get_connection', + 'get_messaging_urls' ] -CONF = cfg.CONF +LOG = logging.getLogger(__name__) def get_messaging_urls(): @@ -30,6 +38,82 @@ def get_messaging_urls(): :rtype: ``list`` ''' - if CONF.messaging.cluster_urls: - return CONF.messaging.cluster_urls - return [CONF.messaging.url] + if cfg.CONF.messaging.cluster_urls: + return cfg.CONF.messaging.cluster_urls + return [cfg.CONF.messaging.url] + + +def get_connection(urls=None, connection_kwargs=None): + """ + Retrieve kombu "Conection" class instance configured with all the correct + options using values from the config and provided values. + + :param connection_kwargs: Any additional connection keyword arguments passed directly to the + Connection class constructor. + :type connection_kwargs: ``dict`` + """ + urls = urls or get_messaging_urls() + connection_kwargs = connection_kwargs or {} + + kwargs = {} + + ssl_kwargs = _get_ssl_kwargs(ssl=cfg.CONF.messaging.ssl, + ssl_keyfile=cfg.CONF.messaging.ssl_keyfile, + ssl_certfile=cfg.CONF.messaging.ssl_certfile, + ssl_cert_reqs=cfg.CONF.messaging.ssl_cert_reqs, + ssl_ca_certs=cfg.CONF.messaging.ssl_ca_certs, + login_method=cfg.CONF.messaging.login_method) + + # NOTE: "connection_kwargs" argument passed to this function has precedence over config values + if len(ssl_kwargs) == 1 and ssl_kwargs['ssl'] is True: + kwargs.update({'ssl': True}) + elif len(ssl_kwargs) >= 2: + ssl_kwargs.pop('ssl') + kwargs.update({'ssl': ssl_kwargs}) + + kwargs['login_method'] = cfg.CONF.messaging.login_method + + kwargs.update(connection_kwargs) + + # NOTE: This line contains no secret values so it's OK to log it + LOG.debug('Using SSL context for RabbitMQ connection: %s' % (ssl_kwargs)) + + connection = Connection(urls, **kwargs) + return connection + + +def _get_ssl_kwargs(ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, + ssl_ca_certs=None, login_method=None): + """ + Return SSL keyword arguments to be used with the kombu.Connection class. + """ + ssl_kwargs = {} + + # NOTE: If "ssl" is not set to True we don't pass "ssl=False" argument to the constructor + # because user could still specify to use SSL by including "?ssl=true" query param at the + # end of the connection URL string + if ssl is True: + ssl_kwargs['ssl'] = True + + if ssl_keyfile: + ssl_kwargs['ssl'] = True + ssl_kwargs['keyfile'] = ssl_keyfile + + if ssl_certfile: + ssl_kwargs['ssl'] = True + ssl_kwargs['certfile'] = ssl_certfile + + if ssl_cert_reqs: + if ssl_cert_reqs == 'none': + ssl_cert_reqs = ssl_lib.CERT_NONE + elif ssl_cert_reqs == 'optional': + ssl_cert_reqs = ssl_lib.CERT_OPTIONAL + elif ssl_cert_reqs == 'required': + ssl_cert_reqs = ssl_lib.CERT_REQUIRED + ssl_kwargs['cert_reqs'] = ssl_cert_reqs + + if ssl_ca_certs: + ssl_kwargs['ssl'] = True + ssl_kwargs['ca_certs'] = ssl_ca_certs + + return ssl_kwargs diff --git a/st2common/st2common/transport/workflow.py b/st2common/st2common/transport/workflow.py index c9e3e58713..a199f1cc01 100644 --- a/st2common/st2common/transport/workflow.py +++ b/st2common/st2common/transport/workflow.py @@ -23,7 +23,9 @@ __all__ = [ 'WorkflowExecutionPublisher', - 'get_queue' + + 'get_queue', + 'get_status_management_queue' ] WORKFLOW_EXECUTION_XCHG = kombu.Exchange('st2.workflow', type='topic') @@ -32,9 +34,9 @@ class WorkflowExecutionPublisher(publishers.CUDPublisher, publishers.StatePublisherMixin): - def __init__(self, urls): - publishers.CUDPublisher.__init__(self, urls, WORKFLOW_EXECUTION_XCHG) - publishers.StatePublisherMixin.__init__(self, urls, WORKFLOW_EXECUTION_STATUS_MGMT_XCHG) + def __init__(self): + publishers.CUDPublisher.__init__(self, exchange=WORKFLOW_EXECUTION_XCHG) + publishers.StatePublisherMixin.__init__(self, exchange=WORKFLOW_EXECUTION_STATUS_MGMT_XCHG) def get_queue(name, routing_key): diff --git a/st2common/tests/integration/test_rabbitmq_ssl_listener.py b/st2common/tests/integration/test_rabbitmq_ssl_listener.py new file mode 100644 index 0000000000..eaa000449b --- /dev/null +++ b/st2common/tests/integration/test_rabbitmq_ssl_listener.py @@ -0,0 +1,188 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import os +import ssl +import socket + +import unittest2 +from oslo_config import cfg + +from st2common.transport import utils as transport_utils + +from st2tests.fixturesloader import get_fixtures_base_path + +__all__ = [ + 'RabbitMQTLSListenerTestCase' +] + +CERTS_FIXTURES_PATH = os.path.join(get_fixtures_base_path(), 'ssl_certs/') +ON_TRAVIS = (os.environ.get('TRAVIS', 'false').lower() == 'true') + +NON_SSL_LISTENER_PORT = 5672 +SSL_LISTENER_PORT = 5671 + + +# NOTE: We only run those tests on Travis because at the moment, local vagrant dev VM doesn't +# expose RabbitMQ SSL listener by default +# TODO: Re-enable once we upgrade Travis from Precise to Xenial where latest version of RabbitMQ +# and OpenSSL is available +@unittest2.skip('Skipping until we upgrade to Xenial on Travis') +# @unittest2.skipIf(not ON_TRAVIS, 'Skipping tests because not running on Travis') +class RabbitMQTLSListenerTestCase(unittest2.TestCase): + + def setUp(self): + # Set default values + cfg.CONF.set_override(name='ssl', override=False, group='messaging') + cfg.CONF.set_override(name='ssl_keyfile', override=None, group='messaging') + cfg.CONF.set_override(name='ssl_certfile', override=None, group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=None, group='messaging') + cfg.CONF.set_override(name='ssl_cert_reqs', override=None, group='messaging') + + def test_non_ssl_connection_on_ssl_listener_port_failure(self): + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + expected_msg_1 = '[Errno 104] Connection reset by peer' + expected_msg_2 = 'Socket closed' + + try: + connection.connect() + except Exception as e: + self.assertFalse(connection.connected) + self.assertTrue(isinstance(e, (IOError, socket.error))) + self.assertTrue(expected_msg_1 in str(e) or expected_msg_2 in str(e)) + else: + self.fail('Exception was not thrown') + + if connection: + connection.release() + + def test_ssl_connection_on_ssl_listener_success(self): + # Using query param notation + urls = 'amqp://guest:guest@127.0.0.1:5671/?ssl=true' + connection = transport_utils.get_connection(urls=urls) + + try: + self.assertTrue(connection.connect()) + self.assertTrue(connection.connected) + finally: + if connection: + connection.release() + + # Using messaging.ssl config option + cfg.CONF.set_override(name='ssl', override=True, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + try: + self.assertTrue(connection.connect()) + self.assertTrue(connection.connected) + finally: + if connection: + connection.release() + + def test_ssl_connection_ca_certs_provided(self): + ca_cert_path = os.path.join(CERTS_FIXTURES_PATH, 'ca/ca_certificate_bundle.pem') + + cfg.CONF.set_override(name='ssl', override=True, group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + # 1. Validate server cert against a valid CA bundle (success) - cert required + cfg.CONF.set_override(name='ssl_cert_reqs', override='required', group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + try: + self.assertTrue(connection.connect()) + self.assertTrue(connection.connected) + finally: + if connection: + connection.release() + + # 2. Validate server cert against other CA bundle (failure) + # CA bundle which was not used to sign the server cert + ca_cert_path = os.path.join('/etc/ssl/certs/thawte_Primary_Root_CA.pem') + + cfg.CONF.set_override(name='ssl_cert_reqs', override='required', group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + expected_msg = r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed' + self.assertRaisesRegexp(ssl.SSLError, expected_msg, connection.connect) + + # 3. Validate server cert against other CA bundle (failure) + ca_cert_path = os.path.join('/etc/ssl/certs/thawte_Primary_Root_CA.pem') + + cfg.CONF.set_override(name='ssl_cert_reqs', override='optional', group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + expected_msg = r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed' + self.assertRaisesRegexp(ssl.SSLError, expected_msg, connection.connect) + + # 4. Validate server cert against other CA bundle (failure) + # We use invalid bundle but cert_reqs is none + ca_cert_path = os.path.join('/etc/ssl/certs/thawte_Primary_Root_CA.pem') + + cfg.CONF.set_override(name='ssl_cert_reqs', override='none', group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + try: + self.assertTrue(connection.connect()) + self.assertTrue(connection.connected) + finally: + if connection: + connection.release() + + def test_ssl_connect_client_side_cert_authentication(self): + # 1. Success, valid client side cert provided + ssl_keyfile = os.path.join(CERTS_FIXTURES_PATH, 'client/private_key.pem') + ssl_certfile = os.path.join(CERTS_FIXTURES_PATH, 'client/client_certificate.pem') + ca_cert_path = os.path.join(CERTS_FIXTURES_PATH, 'ca/ca_certificate_bundle.pem') + + cfg.CONF.set_override(name='ssl_keyfile', override=ssl_keyfile, group='messaging') + cfg.CONF.set_override(name='ssl_certfile', override=ssl_certfile, group='messaging') + cfg.CONF.set_override(name='ssl_cert_reqs', override='required', group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + try: + self.assertTrue(connection.connect()) + self.assertTrue(connection.connected) + finally: + if connection: + connection.release() + + # 2. Invalid client side cert provided - failure + ssl_keyfile = os.path.join(CERTS_FIXTURES_PATH, 'client/private_key.pem') + ssl_certfile = os.path.join(CERTS_FIXTURES_PATH, 'server/server_certificate.pem') + ca_cert_path = os.path.join(CERTS_FIXTURES_PATH, 'ca/ca_certificate_bundle.pem') + + cfg.CONF.set_override(name='ssl_keyfile', override=ssl_keyfile, group='messaging') + cfg.CONF.set_override(name='ssl_certfile', override=ssl_certfile, group='messaging') + cfg.CONF.set_override(name='ssl_cert_reqs', override='required', group='messaging') + cfg.CONF.set_override(name='ssl_ca_certs', override=ca_cert_path, group='messaging') + + connection = transport_utils.get_connection(urls='amqp://guest:guest@127.0.0.1:5671/') + + expected_msg = r'\[X509: KEY_VALUES_MISMATCH\] key values mismatch' + self.assertRaisesRegexp(ssl.SSLError, expected_msg, connection.connect) diff --git a/st2common/tests/unit/test_db.py b/st2common/tests/unit/test_db.py index 856d1919d4..bd1cc93e5b 100644 --- a/st2common/tests/unit/test_db.py +++ b/st2common/tests/unit/test_db.py @@ -14,8 +14,10 @@ # limitations under the License. from __future__ import absolute_import -import jsonschema +import ssl + +import jsonschema import mock import mongoengine.connection from oslo_config import cfg @@ -79,6 +81,55 @@ def test_get_ssl_kwargs(self): 'authentication_mechanism': 'MONGODB-X509' }) + # 3. ssl_keyfile provided + ssl_kwargs = _get_ssl_kwargs(ssl_keyfile='/tmp/keyfile') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_keyfile': '/tmp/keyfile', + 'ssl_match_hostname': True + }) + + # 4. ssl_certfile provided + ssl_kwargs = _get_ssl_kwargs(ssl_certfile='/tmp/certfile') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_certfile': '/tmp/certfile', + 'ssl_match_hostname': True + }) + + # 5. ssl_ca_certs provided + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_ca_certs': '/tmp/ca_certs', + 'ssl_match_hostname': True + }) + + # 6. ssl_ca_certs and ssl_cert_reqs combinations + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='none') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_ca_certs': '/tmp/ca_certs', + 'ssl_cert_reqs': ssl.CERT_NONE, + 'ssl_match_hostname': True + }) + + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='optional') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_ca_certs': '/tmp/ca_certs', + 'ssl_cert_reqs': ssl.CERT_OPTIONAL, + 'ssl_match_hostname': True + }) + + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='required') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ssl_ca_certs': '/tmp/ca_certs', + 'ssl_cert_reqs': ssl.CERT_REQUIRED, + 'ssl_match_hostname': True + }) + @mock.patch('st2common.models.db.mongoengine') def test_db_setup(self, mock_mongoengine): db_setup(db_name='name', db_host='host', db_port=12345, username='username', diff --git a/st2common/tests/unit/test_state_publisher.py b/st2common/tests/unit/test_state_publisher.py index 180805a54c..bc66df337f 100644 --- a/st2common/tests/unit/test_state_publisher.py +++ b/st2common/tests/unit/test_state_publisher.py @@ -14,6 +14,7 @@ # limitations under the License. from __future__ import absolute_import + import kombu import mock import mongoengine as me @@ -22,7 +23,7 @@ from st2common.models.db import stormbase from st2common.persistence import base as persistence from st2common.transport import publishers -from st2common.transport import utils as transport_utils + from st2tests import DbTestCase @@ -30,8 +31,8 @@ class FakeModelPublisher(publishers.StatePublisherMixin): - def __init__(self, url): - super(FakeModelPublisher, self).__init__(url, FAKE_STATE_MGMT_XCHG) + def __init__(self): + super(FakeModelPublisher, self).__init__(exchange=FAKE_STATE_MGMT_XCHG) class FakeModelDB(stormbase.StormBaseDB): @@ -49,7 +50,7 @@ def _get_impl(cls): @classmethod def _get_publisher(cls): if not cls.publisher: - cls.publisher = FakeModelPublisher(transport_utils.get_messaging_urls()) + cls.publisher = FakeModelPublisher() return cls.publisher @classmethod diff --git a/st2common/tests/unit/test_transport.py b/st2common/tests/unit/test_transport.py new file mode 100644 index 0000000000..6c217c3347 --- /dev/null +++ b/st2common/tests/unit/test_transport.py @@ -0,0 +1,80 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ssl + +import unittest2 + +from st2common.transport.utils import _get_ssl_kwargs + +__all__ = [ + 'TransportUtilsTestCase' +] + + +class TransportUtilsTestCase(unittest2.TestCase): + def test_get_ssl_kwargs(self): + # 1. No SSL kwargs provided + ssl_kwargs = _get_ssl_kwargs() + self.assertEqual(ssl_kwargs, {}) + + # 2. ssl kwarg provided + ssl_kwargs = _get_ssl_kwargs(ssl=True) + self.assertEqual(ssl_kwargs, { + 'ssl': True + }) + + # 3. ssl_keyfile provided + ssl_kwargs = _get_ssl_kwargs(ssl_keyfile='/tmp/keyfile') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'keyfile': '/tmp/keyfile' + }) + + # 4. ssl_certfile provided + ssl_kwargs = _get_ssl_kwargs(ssl_certfile='/tmp/certfile') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'certfile': '/tmp/certfile' + }) + + # 5. ssl_ca_certs provided + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ca_certs': '/tmp/ca_certs' + }) + + # 6. ssl_ca_certs and ssl_cert_reqs combinations + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='none') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ca_certs': '/tmp/ca_certs', + 'cert_reqs': ssl.CERT_NONE + }) + + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='optional') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ca_certs': '/tmp/ca_certs', + 'cert_reqs': ssl.CERT_OPTIONAL + }) + + ssl_kwargs = _get_ssl_kwargs(ssl_ca_certs='/tmp/ca_certs', ssl_cert_reqs='required') + self.assertEqual(ssl_kwargs, { + 'ssl': True, + 'ca_certs': '/tmp/ca_certs', + 'cert_reqs': ssl.CERT_REQUIRED + }) diff --git a/st2exporter/st2exporter/worker.py b/st2exporter/st2exporter/worker.py index 49ba9a8ad0..d5a4dcd55b 100644 --- a/st2exporter/st2exporter/worker.py +++ b/st2exporter/st2exporter/worker.py @@ -15,7 +15,6 @@ import eventlet from six.moves import queue -from kombu import Connection from oslo_config import cfg from st2common import log as logging @@ -124,5 +123,5 @@ def _get_all_executions_from_db(self): def get_worker(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return ExecutionsExporter(conn, [EXPORTER_WORK_QUEUE]) diff --git a/st2reactor/st2reactor/rules/worker.py b/st2reactor/st2reactor/rules/worker.py index 5d8a049e16..32c30cb14f 100644 --- a/st2reactor/st2reactor/rules/worker.py +++ b/st2reactor/st2reactor/rules/worker.py @@ -15,8 +15,6 @@ from __future__ import absolute_import -from kombu import Connection - from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT, TRACE_ID from st2common.constants import triggers as trigger_constants @@ -119,5 +117,5 @@ def _decompose_pre_ack_process_response(response): def get_worker(): - with Connection(transport_utils.get_messaging_urls()) as conn: + with transport_utils.get_connection() as conn: return TriggerInstanceDispatcher(conn, [RULESENGINE_WORK_QUEUE]) diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index f96aa774f3..60b9480d22 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -149,7 +149,34 @@ def _register_api_opts(): help='URL of the messaging server.'), cfg.ListOpt( 'cluster_urls', default=[], - help='URL of all the nodes in a messaging service cluster.') + help='URL of all the nodes in a messaging service cluster.'), + cfg.IntOpt( + 'connection_retries', default=10, + help='How many times should we retry connection before failing.'), + cfg.IntOpt( + 'connection_retry_wait', default=10000, + help='How long should we wait between connection retries.'), + cfg.BoolOpt( + 'ssl', default=False, + help='Use SSL / TLS to connect to the messaging server. Same as ' + 'appending "?ssl=true" at the end of the connection URL string.'), + cfg.StrOpt( + 'ssl_keyfile', default=None, + help='Private keyfile used to identify the local connection against RabbitMQ.'), + cfg.StrOpt( + 'ssl_certfile', default=None, + help='Certificate file used to identify the local connection (client).'), + cfg.StrOpt( + 'ssl_cert_reqs', default=None, choices='none, optional, required', + help='Specifies whether a certificate is required from the other side of the ' + 'connection, and whether it will be validated if provided.'), + cfg.StrOpt( + 'ssl_ca_certs', default=None, + help='ca_certs file contains a set of concatenated CA certificates, which are ' + 'used to validate certificates passed from RabbitMQ.'), + cfg.StrOpt( + 'login_method', default=None, + help='Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).') ] _register_opts(messaging_opts, group='messaging') diff --git a/st2tests/st2tests/fixtures/ssl_certs/README.md b/st2tests/st2tests/fixtures/ssl_certs/README.md new file mode 100644 index 0000000000..d54f4f1e6b --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/README.md @@ -0,0 +1,10 @@ +# SSL certificates Used for Testing + +This directory contains self signed server and client certificates which are +used by the tests. + +Those certificates are issues and signed by a custom CA which is contained in the ca/ directory. + +Certificate passphrase is ``MySecretPassword``. + +NOTE: Those cerificates will expire on ``notAfter=Feb 11 15:58:38 2024 GMT``. diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.cer b/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.cer new file mode 100644 index 0000000000..94557aa645 Binary files /dev/null and b/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.cer differ diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem b/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem new file mode 100644 index 0000000000..a194ec97df --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICxjCCAa6gAwIBAgIJALjUApUWLemKMA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV +BAMMCE15VGVzdENBMB4XDTE5MDIxMjE1NTcwM1oXDTI0MDIxMTE1NTcwM1owEzER +MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB +AQDxVR7nSFKXUMET0WTtVNjsgD1HDdvIZcDyPGFEMNhtftPv4RmkxeFnKNumHbIu +s2eox6MCT7wK9CKG+38szyMMDkCObYkGCKzZG2yejkjs6Kv74hvML8p+NIz3Cxch +WEuD6ubnSoKl35cVt4/LUTM/IFG36H6f7Q47NYYsWIBMaXUvY5Wbg5SqxD4LMKkx +uDFzITyrA38xvwb96mTkXT/OJEyswAAeWjjoKHWdirknhiFvKXi1T9jdmJTwBnGz +lFUS1Aavkj/Og7el9JjoL6S83mclDPbcD68/kWUliHHr8l1wfAP/oObOm7wpXViU +64nFnHP0/WtTM50urnWjFYjVAgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P +BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQDwqchOuO85hfRb25LMeB9T0iEpwQdY +cKCD1ASg42Sp/mzlscPmODxILnvm3BItEKbq6mrG2s9i42FRmXu+6D2Bm7k1jDnh +FW/hI5KG5ULQWfkFqgUAWyeSKTF7oK9FRAfROY3K9E/MXxsO10e+ibgZPZjY8RTC +eUihRw3LvIFj3mY3OQ+sBQ4OTh/nPd66trzAJee15ATC0nK0YJTVhLv576DmxOyb +yuESg2l8qvjXI0C/W+MyLCO4sH1hhg+5pjEwiXH3Z1Sk59l7qag21kp53xhvjL7W ++zisXvuZC08wfCPc3RJ6ThRb8MZZKeFpOffVVHBtgv9Aes7IOyVG15XA +-----END CERTIFICATE----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/certs/01.pem b/st2tests/st2tests/fixtures/ssl_certs/ca/certs/01.pem new file mode 100644 index 0000000000..17c4490f8b --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/certs/01.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC4jCCAcqgAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl +c3RDQTAeFw0xOTAyMTIxNTU4MDdaFw0yNDAyMTExNTU4MDdaMCUxEjAQBgNVBAMM +CWxvY2FsaG9zdDEPMA0GA1UECgwGc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAuLLUdbHqOsUiRnkv2S0fiadrqwfdgaZgVImvMyorVYzoJ5W7 +anJSyWPnV/ly/rjL7toiPhBcVgDuCGkf7CjPN4E5tdxI9ylYk/UHEtMG1ll6kDiF +8hWfHDdktdqnQvuLkUMAA5xgIFfX+UMBuTZk7VowrjnOuljN5eVN89y2fYXXtqC1 +91HilG9VwLewYKQd/Ishb4p2WfxiBIVO+cQpnYB6quvrEYC1XPcRbJuXdrc7KcYn +dWdoj6M7aT1zOnHJrdLtv7F7dkYgV9vqwN7w3ud7uNaEbsHvWz0i+6qjX/uE755N +ZoJ8O8Dx5ug/1lxplnXlfmadIibYPBJatRsSiwIDAQABoy8wLTAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsF +AAOCAQEAnhmIUhZwweCqdzGNeoNXXkuXyBf2fFvajHlG2a2pZ8r6/fyQbbJgzo04 +ajjWoUoSW+XB+AfJvT6CTZuMWsGkxYvFAxOoXtLpW0OKqEh55q8diMSb/gOxxwND +vHVb1+VjZBhzxxt0TbXeFngMnBSgVhipKQe49pe0H+rDDYptultl81n2zFLzBKUe +h927CnTJ7cpZe4Di2tMJfVsDJB6piuwPu6GnWhT38Q12I+ryL2xbihIw1B4qDtq6 +nq4lYGnpJCNNXg5JR5S1HeYiQtP0sHgU6SvpgMtzDdbCJ0Nu7EpR5J3ChdQWooGf +uTOThX41qx1p47ho4TA9Ac4K/GRcLg== +-----END CERTIFICATE----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/certs/02.pem b/st2tests/st2tests/fixtures/ssl_certs/ca/certs/02.pem new file mode 100644 index 0000000000..a10ae91143 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/certs/02.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC4jCCAcqgAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl +c3RDQTAeFw0xOTAyMTIxNTU4MzhaFw0yNDAyMTExNTU4MzhaMCUxEjAQBgNVBAMM +CWxvY2FsaG9zdDEPMA0GA1UECgwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEA4HxkZw50MGiWYmlrwJBHAjwsD7lfft9gHrRAeP8iEI0oLIJm +/MmUUIyA2DSDGJCIsP+grkmZawLmu7D0vJIVIUo+OBNUQ/3mACWH9z15AW5s/Ig/ +FZErhBg3RFZS+hXVT639U94uKne+mjh/G4Ej7OYHhBywn+EKakIJuUTs10sF0kW/ +4h1Gx9+Ph3tfYSagNdMDXXft0Knn/X8vMwLF5Eg8ZHKnty30wJRr4r2bqTeSCPS5 +k3bfpcxOAnaSpTDuIoxIp7w9pjwLVAVWvbjqDlU5DrPxpsn29i8STNpJ7My7+12/ +C/QJDrlCJCav1ma04G2QZbyAri3ax/MCeonFsQIDAQABoy8wLTAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDAjANBgkqhkiG9w0BAQsF +AAOCAQEAI+PgF1gsQckqTh71CxqKimM0h5pIGh6H09bSa+9LFLFa60E1zR8rmygw +AD+u6sI5foFbSdUiIDJBmHizvwMmIptGSRw0Znzi/jjbjBmZSNLnk+Vird5grjF4 +Pf7Vkgi/NKzXTS3Y2TUUhk5OZZ6OmszHZ0eGJlUcz6Qa13hcalVHc3FmikeAu5/h +XQuthOQDXJBabgexQ+1K6ft6DDImdQCFcZhYXSb30cRHS9lqIVZbI7Rtk6UqwkvE +hYU0g8BVeVBpL7xYBqfrpdy+vBb28rrLT6Dvgf0giQ3F07S+RAivDWjM53Wyhb7T +6o3h8l49IkcEW1mns9Mj2bPNFSOhSA== +-----END CERTIFICATE----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt new file mode 100644 index 0000000000..ad058db53d --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt @@ -0,0 +1,2 @@ +V 240211155807Z 01 unknown /CN=localhost/O=server +V 240211155838Z 02 unknown /CN=localhost/O=client diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr new file mode 100644 index 0000000000..8f7e63a347 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr @@ -0,0 +1 @@ +unique_subject = yes diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr.old b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr.old new file mode 100644 index 0000000000..8f7e63a347 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.attr.old @@ -0,0 +1 @@ +unique_subject = yes diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.old b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.old new file mode 100644 index 0000000000..970c83b368 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/index.txt.old @@ -0,0 +1 @@ +V 240211155807Z 01 unknown /CN=localhost/O=server diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/openssl.cnf b/st2tests/st2tests/fixtures/ssl_certs/ca/openssl.cnf new file mode 100644 index 0000000000..a8348fbf15 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/openssl.cnf @@ -0,0 +1,54 @@ +[ ca ] +default_ca = testca + +[ testca ] +dir = . +certificate = $dir/ca_certificate_bundle.pem +database = $dir/index.txt +new_certs_dir = $dir/certs +private_key = $dir/private/ca_private_key.pem +serial = $dir/serial + +default_crl_days = 7 +default_days = 1825 +default_md = sha256 + +policy = testca_policy +x509_extensions = certificate_extensions + +[ testca_policy ] +commonName = supplied +stateOrProvinceName = optional +countryName = optional +emailAddress = optional +organizationName = optional +organizationalUnitName = optional +domainComponent = optional + +[ certificate_extensions ] +basicConstraints = CA:false + +[ req ] +default_bits = 2048 +default_keyfile = ./private/ca_private_key.pem +default_md = sha256 +prompt = yes +distinguished_name = root_ca_distinguished_name +x509_extensions = root_ca_extensions + +[ root_ca_distinguished_name ] +commonName = hostname + +[ root_ca_extensions ] +basicConstraints = CA:true +keyUsage = keyCertSign, cRLSign + +[ client_ca_extensions ] +basicConstraints = CA:false +keyUsage = digitalSignature,keyEncipherment +extendedKeyUsage = 1.3.6.1.5.5.7.3.2 + +[ server_ca_extensions ] +basicConstraints = CA:false +keyUsage = digitalSignature,keyEncipherment +extendedKeyUsage = 1.3.6.1.5.5.7.3.1 diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/private/ca_private_key.pem b/st2tests/st2tests/fixtures/ssl_certs/ca/private/ca_private_key.pem new file mode 100644 index 0000000000..e54d4958cd --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/private/ca_private_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDxVR7nSFKXUMET +0WTtVNjsgD1HDdvIZcDyPGFEMNhtftPv4RmkxeFnKNumHbIus2eox6MCT7wK9CKG ++38szyMMDkCObYkGCKzZG2yejkjs6Kv74hvML8p+NIz3CxchWEuD6ubnSoKl35cV +t4/LUTM/IFG36H6f7Q47NYYsWIBMaXUvY5Wbg5SqxD4LMKkxuDFzITyrA38xvwb9 +6mTkXT/OJEyswAAeWjjoKHWdirknhiFvKXi1T9jdmJTwBnGzlFUS1Aavkj/Og7el +9JjoL6S83mclDPbcD68/kWUliHHr8l1wfAP/oObOm7wpXViU64nFnHP0/WtTM50u +rnWjFYjVAgMBAAECggEBAN14Pz8CyQCiFD5KqHOArP4FBbciSbMTZkknDiAVL1j0 +zixSiEUFb8BK55//mphu/c8PPlINuETZHKKBRIlrof8bSTUr4laOOYmYOEsdymDX +eZVTQC1XIl5FfaPtIpHwRITQWoyhfVoZ4b4FUcnFP+FLmJLMov/C/Y9qpDIoGb2E +NbcMEnIz0i573+Ci1k+OLAdthbCigUvwvJ1iLv5m3s1XrRvIu6TDsERXdB/02pFu +XXNgyidR6XVr/MVov898PB5B0eJbX6Iir7avzpS1V/q5kq2pgFFZk8Vfhvw2k07C +l89peWIo+1h8djem/1n1FLD7aRKzFTb6HULS4uoxCDUCgYEA/o23BbC1/LRTq0IW +7I8BqTCe70cnuvWCUtWiTNWzX3INK4Hsxdeztfiu/W5dTiDndZ4CzMAoXt/rxXkw +Dc449FB1wVKCKShZRyeyyboOCpfzW1040JhjmGU4ZBn6T4U2cpaJyLGtcfkFZSeq +2nOiUntVJcPq6vWF2sdJysGSWucCgYEA8rQsf5RaHM6yRFpKrbb+SC8PAEqAZF9N +XZvl64GLHi9PSK/n68iZw1c7j4VjnCC89PH0flpQfkngrffLiy2pi+JdYo7qBKeT +3IFOiQAvylpxCiQMvFqsxz9mhoj3jJdyNGvKXJeQ5PuxRatZOHwpMP+tpQ7uF2zm +DzReoxqZ4uMCgYB1XNFthjPh9yI8a5Q2LRkO8KPWnm/q+xbDKkxSMJUrBGKeFKEd +9n2dALNtlVzfkLwmtluEG3SBiawit+U3+ES6H/6qy2fHohrHe74q0+V1bOl+zlRL +mHcS5FhDjtaho0GfQ1jzdzgIvE+Ie+mCHp5QeRyg9NtyyRCV9hxHp0fbMQKBgQDr +Cqn9c8JBG7twjrC7wvhHF6vDcGMe0VyvRwdHJ9F+jfqOPiywHzkqABTiTR/GV74m +yRsqMnS5mPpKACvSwYnsunANvrHLiC6d4WwZKWEe6q+GTps23eltnGzB5Ws3cINd +WPZE7VOZLlbjTam+FiAeH74el3LkpMW3+9OayWw2WQKBgQD0S0L5OoRjVY6SRPe1 +oKqTwSlay2uzqoAhGQqGeb4SaBaImEfLMQzYQpJ5JWAnAzwHhA7x7iDm3QzB93Fg +id1rdsbfzdlZC40T0IslTYLT/mawiOcAHupDuszgnn1ycFV35915zP9Ijzqaojsn +DRI3H6XpQSJyHUNZo1pCZBXyhg== +-----END PRIVATE KEY----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/serial b/st2tests/st2tests/fixtures/ssl_certs/ca/serial new file mode 100644 index 0000000000..75016ea362 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/serial @@ -0,0 +1 @@ +03 diff --git a/st2tests/st2tests/fixtures/ssl_certs/ca/serial.old b/st2tests/st2tests/fixtures/ssl_certs/ca/serial.old new file mode 100644 index 0000000000..9e22bcb8e3 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/ca/serial.old @@ -0,0 +1 @@ +02 diff --git a/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.p12 b/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.p12 new file mode 100644 index 0000000000..7feead70f4 Binary files /dev/null and b/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.p12 differ diff --git a/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.pem b/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.pem new file mode 100644 index 0000000000..a10ae91143 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC4jCCAcqgAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl +c3RDQTAeFw0xOTAyMTIxNTU4MzhaFw0yNDAyMTExNTU4MzhaMCUxEjAQBgNVBAMM +CWxvY2FsaG9zdDEPMA0GA1UECgwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEA4HxkZw50MGiWYmlrwJBHAjwsD7lfft9gHrRAeP8iEI0oLIJm +/MmUUIyA2DSDGJCIsP+grkmZawLmu7D0vJIVIUo+OBNUQ/3mACWH9z15AW5s/Ig/ +FZErhBg3RFZS+hXVT639U94uKne+mjh/G4Ej7OYHhBywn+EKakIJuUTs10sF0kW/ +4h1Gx9+Ph3tfYSagNdMDXXft0Knn/X8vMwLF5Eg8ZHKnty30wJRr4r2bqTeSCPS5 +k3bfpcxOAnaSpTDuIoxIp7w9pjwLVAVWvbjqDlU5DrPxpsn29i8STNpJ7My7+12/ +C/QJDrlCJCav1ma04G2QZbyAri3ax/MCeonFsQIDAQABoy8wLTAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDAjANBgkqhkiG9w0BAQsF +AAOCAQEAI+PgF1gsQckqTh71CxqKimM0h5pIGh6H09bSa+9LFLFa60E1zR8rmygw +AD+u6sI5foFbSdUiIDJBmHizvwMmIptGSRw0Znzi/jjbjBmZSNLnk+Vird5grjF4 +Pf7Vkgi/NKzXTS3Y2TUUhk5OZZ6OmszHZ0eGJlUcz6Qa13hcalVHc3FmikeAu5/h +XQuthOQDXJBabgexQ+1K6ft6DDImdQCFcZhYXSb30cRHS9lqIVZbI7Rtk6UqwkvE +hYU0g8BVeVBpL7xYBqfrpdy+vBb28rrLT6Dvgf0giQ3F07S+RAivDWjM53Wyhb7T +6o3h8l49IkcEW1mns9Mj2bPNFSOhSA== +-----END CERTIFICATE----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/client/private_key.pem b/st2tests/st2tests/fixtures/ssl_certs/client/private_key.pem new file mode 100644 index 0000000000..7ddd509e15 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/client/private_key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA4HxkZw50MGiWYmlrwJBHAjwsD7lfft9gHrRAeP8iEI0oLIJm +/MmUUIyA2DSDGJCIsP+grkmZawLmu7D0vJIVIUo+OBNUQ/3mACWH9z15AW5s/Ig/ +FZErhBg3RFZS+hXVT639U94uKne+mjh/G4Ej7OYHhBywn+EKakIJuUTs10sF0kW/ +4h1Gx9+Ph3tfYSagNdMDXXft0Knn/X8vMwLF5Eg8ZHKnty30wJRr4r2bqTeSCPS5 +k3bfpcxOAnaSpTDuIoxIp7w9pjwLVAVWvbjqDlU5DrPxpsn29i8STNpJ7My7+12/ +C/QJDrlCJCav1ma04G2QZbyAri3ax/MCeonFsQIDAQABAoIBAFjujqwRGtCOrn0A +PJLF1Yu6IM595qoRfjfLuvr0QB+EfFTduEUO6rXaY7TDYOgbYjuUmahSOfgd5yCW +Iu6NhNdyXSHD7o8dB8ApHitBbC23/G8y3qMBptam7UYiWK8AdUgiqohOLcXfOGBK +X3ia+YuBOZsJ7qL3+TNNRCLkfltvfA4pkCMgfdZUecJcc0jFNMoCBiyk61CnNhLL +uy1oMS7JzqPRM1ySWCdBJFkV1omDHgrgBx7VmympFUJHb6kVUSh/mnPTejTcM1ds +BkNecBbS/w2X9Gb9PSZzLCAEwmJ8J0hRkgDiahN7Q/kNsQ3ca3r03iocJALecBsW +3sujeH0CgYEA+5ewcq9M/sxdZnuZy69v7T2j8Q/FGGF7IQHlT67r80cEtXeAjlrN +0D9I3+cOrvz57Eay0n2hGLWzhyex6TTX9pZozTjcMuqRkB2ztPp3HkjRucpVhGz4 +pbADvO+ZgO87AGW13E8BBDN8BsWHPFpWpwpHvEcp05sFeUdeGqJfcHsCgYEA5Gsj +dndnmxX63it2Fa3I05MynAiqnt9MNm6zcNqPMKauK6xaawZv5FvQSd5MUQa9sj3s +VgYKr9e61u7WMaHqNwn6BUOwMKv26lwjkXW/wV3QMNzn5bzS2CyjWJEjdPq0WqoH +RRvR455mAlhTVFSyOJ279WXUWoPxqDbd/Y+1yMMCgYAlDqmxqrpniUh0kN4NT1Do +G70rA4yfU7RkHzhcbUJZuesqo2hvD1bjRn8AY7MY+TACqkMql9CDqDfCP4mH9P2e +V3cmSyq74SsBlC5lCMNE1ar2d6Py9m4FUZCrYos0n4gMPe70fTqEGOU6xhtuO0wq +HGyGgeDaRyoeO/HTcHkoQwKBgQCFqaQw2KKKAAyzIV+SRAV2uXYuFGwzV5uzZoge +i+aqo37cE5k9c6DaUlfKQgkKiRVMTiwUEqkCSQ0OZOh2VrdFydLCbd+WO6rbbVtq +7SpursT7MumIaDxBP62+UAAdne8X9tMWP7dMqQ4sZR8uA/neY37vlMz0wq0QsDqq +/AN2HQKBgQDZQIZuZwS12f2Mt/E/27I8lyDiVEj59zwxeayxFq8SzUtbWnWeepes +vtsdF19dWXzwI8MjTDhGo45YyKwtNXMp+uiMA0QFo4R07D68VrxAUDYGgnhhAxlZ +Wmq8OapkJUp69GeDgnG0F72eMhrQu6fJN1dpvNAkfZiuyT2BGBc6cA== +-----END RSA PRIVATE KEY----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/client/req.pem b/st2tests/st2tests/fixtures/ssl_certs/client/req.pem new file mode 100644 index 0000000000..58e270e22a --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/client/req.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICajCCAVICAQAwJTESMBAGA1UEAwwJbG9jYWxob3N0MQ8wDQYDVQQKDAZjbGll +bnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDgfGRnDnQwaJZiaWvA +kEcCPCwPuV9+32AetEB4/yIQjSgsgmb8yZRQjIDYNIMYkIiw/6CuSZlrAua7sPS8 +khUhSj44E1RD/eYAJYf3PXkBbmz8iD8VkSuEGDdEVlL6FdVPrf1T3i4qd76aOH8b +gSPs5geEHLCf4QpqQgm5ROzXSwXSRb/iHUbH34+He19hJqA10wNdd+3Qqef9fy8z +AsXkSDxkcqe3LfTAlGvivZupN5II9LmTdt+lzE4CdpKlMO4ijEinvD2mPAtUBVa9 +uOoOVTkOs/Gmyfb2LxJM2knszLv7Xb8L9AkOuUIkJq/WZrTgbZBlvICuLdrH8wJ6 +icWxAgMBAAGgADANBgkqhkiG9w0BAQsFAAOCAQEApuP6zTVRGLa69IXIyGIqDzb6 +NjQxyTbB5SzbtgqvdcBs5EuntsFTmS11umKwzoqT0+Kf3JtwO8pu8rQbX3C/EWOP +/eWqFPnGTCRk0AE+m08XxiAgQrgOxiMj483ka6Qr3OdT7zjW6xUyE0ObD+auD+fx +9siygGy8P9X0x0PqpWQoZm17x3bUfERiIl+oI/BltuUmAfPgELtEIBjcz+Xrslgl +5iV8Rn/+srFwMT80QLt9iypt0Me8IkbKTWpDUVQYEaXA3svCvGuthzeukImmmAPZ +rpcXR6WvYVdb2HekgqZtgvDg4FDeLidK164uTeOlCC/CRLPKyJu9VJpTQamC6g== +-----END CERTIFICATE REQUEST----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/server/private_key.pem b/st2tests/st2tests/fixtures/ssl_certs/server/private_key.pem new file mode 100644 index 0000000000..05924ec179 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/server/private_key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAuLLUdbHqOsUiRnkv2S0fiadrqwfdgaZgVImvMyorVYzoJ5W7 +anJSyWPnV/ly/rjL7toiPhBcVgDuCGkf7CjPN4E5tdxI9ylYk/UHEtMG1ll6kDiF +8hWfHDdktdqnQvuLkUMAA5xgIFfX+UMBuTZk7VowrjnOuljN5eVN89y2fYXXtqC1 +91HilG9VwLewYKQd/Ishb4p2WfxiBIVO+cQpnYB6quvrEYC1XPcRbJuXdrc7KcYn +dWdoj6M7aT1zOnHJrdLtv7F7dkYgV9vqwN7w3ud7uNaEbsHvWz0i+6qjX/uE755N +ZoJ8O8Dx5ug/1lxplnXlfmadIibYPBJatRsSiwIDAQABAoIBAQC0UxytYCvwfyFs +rsrxfWWqLsQm8oHoH/ky8E4WZRhz6SOL6ltVnRKIvzpSISCN4vxwUZZXBAAyk6vS +mFhraJiPd2JR1SWD8mEh63uhfFjTk/7eqeDUrxluIgL4rebZtd/YzhJIdDdBvKIH +Ic2f96RoO8MFhzj3pNY5mzwVWCrvtsEY4ygrblQrweqNbcaowJ/YQPPkgvXb6dC3 +IXjBL5IzOwTlnIYhFkuZY736Z8GOw9rcyGxITHAKavWOJkE72drh0gv5rBnu2NLz +Lgta6o+p6/DU1tjq2LRllq1HDL7uy5yGxBtB+uXly22Ur/rQzYBKeRHkj2OqZKlV +kNiyKBipAoGBAOMkqqTu9dd8xPCgu8iQWHlKVwL6gp4Ij/0PCpXL5v5cktyoAvd9 +fb22UGeFLbbdUuctO711oMfMXl8nULafT54WbnSCG2f+oiRacupJQ/QLPQ8nV8Gy +K9+H/rYZ+ggLNkNqjvM5xQZ6/AxZxWEv+qNJfPF0fG1iCWmYh0OrmfDdAoGBANAp +vma47lG3dnQfga88//SJCeuluwumjXvN8gQJvwU1ofaGjRdKxtexWBuZG6BPXnCv +yRm5tWYJnxj+zUF+ImMsd7sd/Iy1PW7gdZtMtjIW4Qmys0IKK3zkwGygayFrnyhg +WU0t63OEiKEJ7mQzvOAmnTG+H7fZ6WWm3gxi+WaHAoGAYDda9YynpMUcY1Wi1d2X +LKG54/AbvjegTrC9aiC6U4sBRukAgLeuuNruijtW1vw/rt9xS9r05U2DuEjeHs2z +GyMjXMT0OQQayM1rmiS43TqZfb7LpKgFf6WK1raAPEILlVkg/pS9Cfa0p8KrInUB +dYOeomUWg/sgQ5Ox0I9zIR0CgYAYxl8a6reykhtPBtDwgloUSJsdqMPyRwhfy8sa +H+7UN+Xm6WyxcPzpfvn1juty0P90efd9UFT+p/Z/ixPyz4hYNVqqso70UD3XjG9y +5FZq774o4VPkcEFsw+0DALS/bYerzovSW7zCKuv3/q6Yzm+UXgQnf3FW+GCG8K1M +3BrC0QKBgC6srVlHBF9FI1D/9yjjx3JIVmKKS7YleAl36t05zCfR46FDPPa7J4/+ +1UzBkEFkn0/Ven8bbkOKr9v7wBjxszCnvZPxDm9oGU8l8TjrZYiuwi0euF+4r61v +HYueOtTDjtOYSPXbQcypA0FjdeHPE5XY6O4I8ti9URyV+M80vijk +-----END RSA PRIVATE KEY----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/server/req.pem b/st2tests/st2tests/fixtures/ssl_certs/server/req.pem new file mode 100644 index 0000000000..5135c2cc33 --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/server/req.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICajCCAVICAQAwJTESMBAGA1UEAwwJbG9jYWxob3N0MQ8wDQYDVQQKDAZzZXJ2 +ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4stR1seo6xSJGeS/Z +LR+Jp2urB92BpmBUia8zKitVjOgnlbtqclLJY+dX+XL+uMvu2iI+EFxWAO4IaR/s +KM83gTm13Ej3KViT9QcS0wbWWXqQOIXyFZ8cN2S12qdC+4uRQwADnGAgV9f5QwG5 +NmTtWjCuOc66WM3l5U3z3LZ9hde2oLX3UeKUb1XAt7BgpB38iyFvinZZ/GIEhU75 +xCmdgHqq6+sRgLVc9xFsm5d2tzspxid1Z2iPoztpPXM6ccmt0u2/sXt2RiBX2+rA +3vDe53u41oRuwe9bPSL7qqNf+4Tvnk1mgnw7wPHm6D/WXGmWdeV+Zp0iJtg8Elq1 +GxKLAgMBAAGgADANBgkqhkiG9w0BAQsFAAOCAQEAmgj0lyN0I+pik9xQnmt7RhC1 +r+5ivX9ndnMmpeN8jI0RqUOEU3CewSsxKihiVpVHqUGJhHKJmsnEh/aiD2dPorK+ +I0NGWXGexk3TfHq/Ey1lwyZc1O9+vOYo/6k3zDhJZg0BekNkYciTsMFpI4h8cDr2 +yV3gzRdFPug2wwBPuKumiJuI6ZQU3G3FjgbUIOox91ZZctH1X3PRFmHjZKiHauwE +3FEzyoJUXPhP/HFGooZ6M81nm5VotozqUbj+pslLGjPdX2stduFfhZOriwH/mKll +7seOwR7GpqOhMDSCfs1gBAZkkyGX+z1hk+hccFJHSO0PLg+32Wtzu1kepBw4kA== +-----END CERTIFICATE REQUEST----- diff --git a/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.p12 b/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.p12 new file mode 100644 index 0000000000..7a937f220b Binary files /dev/null and b/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.p12 differ diff --git a/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.pem b/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.pem new file mode 100644 index 0000000000..17c4490f8b --- /dev/null +++ b/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC4jCCAcqgAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl +c3RDQTAeFw0xOTAyMTIxNTU4MDdaFw0yNDAyMTExNTU4MDdaMCUxEjAQBgNVBAMM +CWxvY2FsaG9zdDEPMA0GA1UECgwGc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAuLLUdbHqOsUiRnkv2S0fiadrqwfdgaZgVImvMyorVYzoJ5W7 +anJSyWPnV/ly/rjL7toiPhBcVgDuCGkf7CjPN4E5tdxI9ylYk/UHEtMG1ll6kDiF +8hWfHDdktdqnQvuLkUMAA5xgIFfX+UMBuTZk7VowrjnOuljN5eVN89y2fYXXtqC1 +91HilG9VwLewYKQd/Ishb4p2WfxiBIVO+cQpnYB6quvrEYC1XPcRbJuXdrc7KcYn +dWdoj6M7aT1zOnHJrdLtv7F7dkYgV9vqwN7w3ud7uNaEbsHvWz0i+6qjX/uE755N +ZoJ8O8Dx5ug/1lxplnXlfmadIibYPBJatRsSiwIDAQABoy8wLTAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsF +AAOCAQEAnhmIUhZwweCqdzGNeoNXXkuXyBf2fFvajHlG2a2pZ8r6/fyQbbJgzo04 +ajjWoUoSW+XB+AfJvT6CTZuMWsGkxYvFAxOoXtLpW0OKqEh55q8diMSb/gOxxwND +vHVb1+VjZBhzxxt0TbXeFngMnBSgVhipKQe49pe0H+rDDYptultl81n2zFLzBKUe +h927CnTJ7cpZe4Di2tMJfVsDJB6piuwPu6GnWhT38Q12I+ryL2xbihIw1B4qDtq6 +nq4lYGnpJCNNXg5JR5S1HeYiQtP0sHgU6SvpgMtzDdbCJ0Nu7EpR5J3ChdQWooGf +uTOThX41qx1p47ho4TA9Ac4K/GRcLg== +-----END CERTIFICATE----- diff --git a/tools/config_gen.py b/tools/config_gen.py index 4f593b2389..c3e2c17b2f 100755 --- a/tools/config_gen.py +++ b/tools/config_gen.py @@ -41,7 +41,7 @@ SKIP_GROUPS = ['api_pecan', 'rbac', 'results_tracker'] -# We group auth options together to nake it a bit more clear what applies where +# We group auth options together to make it a bit more clear what applies where AUTH_OPTIONS = { 'common': [ 'enable', @@ -63,7 +63,7 @@ ] } -# Some of the config values change depenending on the environment where this script is ran so we +# Some of the config values change depending on the environment where this script is ran so we # set them to static values to ensure consistent and stable output STATIC_OPTION_VALUES = { 'actionrunner': { diff --git a/tools/migrate_messaging_setup.py b/tools/migrate_messaging_setup.py index ff2396f636..6232e70f7f 100755 --- a/tools/migrate_messaging_setup.py +++ b/tools/migrate_messaging_setup.py @@ -19,9 +19,9 @@ """ from __future__ import absolute_import + import traceback -from kombu import Connection from st2common import config from st2common.transport import reactor from st2common.transport import utils as transport_utils @@ -47,7 +47,7 @@ def migrate(self): self._cleanup_old_queues() def _cleanup_old_queues(self): - with Connection(transport_utils.get_messaging_urls()) as connection: + with transport_utils.get_connection() as connection: for q in self.OLD_QS: bound_q = q(connection.default_channel) try: diff --git a/tools/queue_consumer.py b/tools/queue_consumer.py index 5c7ef19989..91b1aad4cf 100755 --- a/tools/queue_consumer.py +++ b/tools/queue_consumer.py @@ -19,12 +19,13 @@ """ from __future__ import absolute_import + import random import argparse from pprint import pprint from kombu.mixins import ConsumerMixin -from kombu import Connection, Exchange, Queue +from kombu import Exchange, Queue from st2common import config from st2common.transport import utils as transport_utils @@ -59,7 +60,8 @@ def main(queue, exchange, routing_key='#'): queue = Queue(name=queue, exchange=exchange, routing_key=routing_key, auto_delete=True) - with Connection(transport_utils.get_messaging_urls()) as connection: + with transport_utils.get_connection() as connection: + connection.connect() watcher = QueueConsumer(connection=connection, queue=queue) watcher.run() diff --git a/tools/queue_producer.py b/tools/queue_producer.py index c9b01a47cd..9d98c9151e 100755 --- a/tools/queue_producer.py +++ b/tools/queue_producer.py @@ -19,20 +19,21 @@ """ from __future__ import absolute_import + import argparse +import eventlet from kombu import Exchange from st2common import config - -from st2common.transport import utils as transport_utils from st2common.transport.publishers import PoolPublisher def main(exchange, routing_key, payload): exchange = Exchange(exchange, type='topic') - publisher = PoolPublisher(urls=transport_utils.get_messaging_urls()) + publisher = PoolPublisher() publisher.publish(payload=payload, exchange=exchange, routing_key=routing_key) + eventlet.sleep(0.5) if __name__ == '__main__':