diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d4390ba89d..5782c58096 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,20 @@ Added For backward compatibility reasons, if pack metadata file doesn't contain that attribute, it's assumed it only works with Python 2. (new feature) #4474 +* Update service bootstrap code and make sure all the services register in a service registry once + they come online and become available. + + This functionality is only used internally and will only work if configuration backend is + correctly configured in ``st2.conf`` (new feature) #4548 +* Add new ``GET /v1/service_registry/groups`` and + ``GET /v1/service_registry/groups//members`` API endpoint for listing available service + registry groups and members. + + NOTE: This API endpoint is behind a RBAC control check and can only be views by the admins. + (new feature) #4548 + + Also add corresponding CLI commands - ``st2 service-registry group list``, + ``st2 service registry member list [--group-id=]`` * Adding ``Cache-Control`` header to all API responses so clients will favor refresh from API instead of using cached version. diff --git a/conf/st2.dev.conf b/conf/st2.dev.conf index 52f9ac61dc..8043dffb8f 100644 --- a/conf/st2.dev.conf +++ b/conf/st2.dev.conf @@ -77,6 +77,17 @@ port = 514 facility = local7 protocol = udp +[coordination] +# NOTE: Service registry / groups functionality is only available in the following drivers: +# - zookeeper +# - redis +# - etcd3 +# - etcd3gw +# Keep in mind that zake driver works in process so it won't work when testing cross process +# / cross server functionality +#url = redis://localhost +#url = kazoo://localhost + [webui] # webui_base_url = https://mywebhost.domain diff --git a/st2actions/st2actions/cmd/actionrunner.py b/st2actions/st2actions/cmd/actionrunner.py index d02e6ccd9a..9f1163d25f 100644 --- a/st2actions/st2actions/cmd/actionrunner.py +++ b/st2actions/st2actions/cmd/actionrunner.py @@ -33,8 +33,13 @@ def sigterm_handler(signum=None, frame=None): def _setup(): + capabilities = { + 'name': 'actionrunner', + 'type': 'passive' + } common_setup(service='actionrunner', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True) + register_signal_handlers=True, service_registry=True, capabilities=capabilities) + _setup_sigterm_handler() diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index 9e548d6ebb..b221fbfb15 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -46,8 +46,13 @@ def sigterm_handler(signum=None, frame=None): def _setup(): + capabilities = { + 'name': 'scheduler', + 'type': 'passive' + } common_setup(service='scheduler', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True) + register_signal_handlers=True, service_registry=True, capabilities=capabilities) + _setup_sigterm_handler() diff --git a/st2actions/st2actions/cmd/st2notifier.py b/st2actions/st2actions/cmd/st2notifier.py index b0146ea591..718a4ceefd 100644 --- a/st2actions/st2actions/cmd/st2notifier.py +++ b/st2actions/st2actions/cmd/st2notifier.py @@ -19,8 +19,12 @@ def _setup(): + capabilities = { + 'name': 'notifier', + 'type': 'passive' + } common_setup(service='notifier', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True) + register_signal_handlers=True, service_registry=True, capabilities=capabilities) def _run_worker(): diff --git a/st2actions/st2actions/cmd/st2resultstracker.py b/st2actions/st2actions/cmd/st2resultstracker.py index 312a2cd73a..5af5e151c2 100644 --- a/st2actions/st2actions/cmd/st2resultstracker.py +++ b/st2actions/st2actions/cmd/st2resultstracker.py @@ -20,8 +20,12 @@ def _setup(): - common_setup(service='resultstracker', config=config, setup_db=True, - register_mq_exchanges=True, register_signal_handlers=True) + capabilities = { + 'name': 'resultstracker', + 'type': 'passive' + } + common_setup(service='resultstracker', config=config, setup_db=True, register_mq_exchanges=True, + register_signal_handlers=True, service_registry=True, capabilities=capabilities) def _run_worker(): diff --git a/st2actions/st2actions/cmd/workflow_engine.py b/st2actions/st2actions/cmd/workflow_engine.py index 71a4ec6025..8720b77479 100644 --- a/st2actions/st2actions/cmd/workflow_engine.py +++ b/st2actions/st2actions/cmd/workflow_engine.py @@ -51,12 +51,18 @@ def sigterm_handler(signum=None, frame=None): def setup(): + capabilities = { + 'name': 'workflowengine', + 'type': 'passive' + } common_setup( service='workflow_engine', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True + register_signal_handlers=True, + service_registry=True, + capabilities=capabilities ) setup_sigterm_handler() diff --git a/st2actions/st2actions/scheduler/handler.py b/st2actions/st2actions/scheduler/handler.py index 74027b88ed..08491beb6a 100644 --- a/st2actions/st2actions/scheduler/handler.py +++ b/st2actions/st2actions/scheduler/handler.py @@ -61,7 +61,7 @@ def __init__(self): self.message_type = LiveActionDB self._shutdown = False self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size) - self._coordinator = coordination_service.get_coordinator() + self._coordinator = coordination_service.get_coordinator(start_heart=True) self._main_thread = None self._cleanup_thread = None diff --git a/st2actions/tests/unit/policies/test_concurrency.py b/st2actions/tests/unit/policies/test_concurrency.py index 301fae2b7d..f90fae2664 100644 --- a/st2actions/tests/unit/policies/test_concurrency.py +++ b/st2actions/tests/unit/policies/test_concurrency.py @@ -98,6 +98,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): # Reset the coordinator. + coordination.coordinator_teardown(coordination.COORDINATOR) coordination.COORDINATOR = None super(ConcurrencyPolicyTestCase, cls).tearDownClass() diff --git a/st2actions/tests/unit/policies/test_concurrency_by_attr.py b/st2actions/tests/unit/policies/test_concurrency_by_attr.py index 0056b33a4f..81c722ed7a 100644 --- a/st2actions/tests/unit/policies/test_concurrency_by_attr.py +++ b/st2actions/tests/unit/policies/test_concurrency_by_attr.py @@ -97,6 +97,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): # Reset the coordinator. + coordination.coordinator_teardown(coordination.COORDINATOR) coordination.COORDINATOR = None super(ConcurrencyByAttributePolicyTestCase, cls).tearDownClass() diff --git a/st2api/st2api/app.py b/st2api/st2api/app.py index 4f29fc9778..62ac304793 100644 --- a/st2api/st2api/app.py +++ b/st2api/st2api/app.py @@ -46,6 +46,13 @@ def setup_app(config={}): monkey_patch() st2api_config.register_opts() + capabilities = { + 'name': 'api', + 'listen_host': cfg.CONF.api.host, + 'listen_port': cfg.CONF.api.port, + 'type': 'active' + } + # This should be called in gunicorn case because we only want # workers to connect to db, rabbbitmq etc. In standalone HTTP # server case, this setup would have already occurred. @@ -54,6 +61,8 @@ def setup_app(config={}): register_signal_handlers=True, register_internal_trigger_types=True, run_migrations=True, + service_registry=True, + capabilities=capabilities, config_args=config.get('config_args', None)) # Additional pre-run time checks diff --git a/st2api/st2api/cmd/api.py b/st2api/st2api/cmd/api.py index c412dd3c33..6ae6ec877d 100644 --- a/st2api/st2api/cmd/api.py +++ b/st2api/st2api/cmd/api.py @@ -43,8 +43,16 @@ def _setup(): + capabilities = { + 'name': 'api', + 'listen_host': cfg.CONF.api.host, + 'listen_port': cfg.CONF.api.port, + 'type': 'active' + } + common_setup(service='api', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True, register_internal_trigger_types=True) + register_signal_handlers=True, register_internal_trigger_types=True, + service_registry=True, capabilities=capabilities) # Additional pre-run time checks validate_rbac_is_correctly_configured() diff --git a/st2api/st2api/controllers/v1/service_registry.py b/st2api/st2api/controllers/v1/service_registry.py new file mode 100644 index 0000000000..91acb36b60 --- /dev/null +++ b/st2api/st2api/controllers/v1/service_registry.py @@ -0,0 +1,77 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from tooz.coordination import GroupNotCreated + +from st2common.services import coordination +from st2common.exceptions.db import StackStormDBObjectNotFoundError +from st2common.rbac import utils as rbac_utils + +__all__ = [ + 'ServiceRegistryGroupsController', + 'ServiceRegistryGroupMembersController', +] + + +class ServiceRegistryGroupsController(object): + def get_all(self, requester_user): + rbac_utils.assert_user_is_admin(user_db=requester_user) + + coordinator = coordination.get_coordinator() + + group_ids = list(coordinator.get_groups().get()) + group_ids = [item.decode('utf-8') for item in group_ids] + + result = { + 'groups': group_ids + } + return result + + +class ServiceRegistryGroupMembersController(object): + def get_one(self, group_id, requester_user): + rbac_utils.assert_user_is_admin(user_db=requester_user) + + coordinator = coordination.get_coordinator() + + if not isinstance(group_id, six.binary_type): + group_id = group_id.encode('utf-8') + + try: + member_ids = list(coordinator.get_members(group_id).get()) + except GroupNotCreated: + msg = ('Group with ID "%s" not found.' % (group_id.decode('utf-8'))) + raise StackStormDBObjectNotFoundError(msg) + + result = { + 'members': [] + } + + for member_id in member_ids: + capabilities = coordinator.get_member_capabilities(group_id, member_id).get() + item = { + 'group_id': group_id.decode('utf-8'), + 'member_id': member_id.decode('utf-8'), + 'capabilities': capabilities + } + result['members'].append(item) + + return result + + +groups_controller = ServiceRegistryGroupsController() +members_controller = ServiceRegistryGroupMembersController() diff --git a/st2api/tests/unit/controllers/v1/test_rbac_for_supported_endpoints.py b/st2api/tests/unit/controllers/v1/test_rbac_for_supported_endpoints.py index 57f1c2bcdf..d7afcd718e 100644 --- a/st2api/tests/unit/controllers/v1/test_rbac_for_supported_endpoints.py +++ b/st2api/tests/unit/controllers/v1/test_rbac_for_supported_endpoints.py @@ -23,8 +23,12 @@ from st2api.controllers.v1.webhooks import HooksHolder from st2common.persistence.rbac import UserRoleAssignment from st2common.models.db.rbac import UserRoleAssignmentDB +from st2common.service_setup import register_service_in_service_registry +from st2common.services import coordination +from st2tests import config as tests_config from st2tests.fixturesloader import FixturesLoader + from tests.base import APIControllerWithRBACTestCase from tests.unit.controllers.v1.test_webhooks import DUMMY_TRIGGER_DICT @@ -110,6 +114,29 @@ class APIControllersRBACTestCase(APIControllerWithRBACTestCase): register_packs = True fixtures_loader = FixturesLoader() + coordinator = None + + @classmethod + def setUpClass(cls): + tests_config.parse_args(coordinator_noop=True) + + super(APIControllersRBACTestCase, cls).setUpClass() + + cls.coordinator = coordination.get_coordinator(use_cache=False) + + # Register mock service in the service registry for testing purposes + service = six.binary_type(six.text_type('mock_service').encode('ascii')) + register_service_in_service_registry(service=service, + capabilities={'key1': 'value1', + 'name': 'mock_service'}, + start_heart=True) + + @classmethod + def tearDownClass(cls): + super(APIControllersRBACTestCase, cls).tearDownClass() + + coordination.coordinator_teardown(cls.coordinator) + def setUp(self): super(APIControllersRBACTestCase, self).setUp() @@ -450,6 +477,17 @@ def test_api_endpoints_behind_rbac_wall(self): 'path': '/v1/rules/views', 'method': 'GET', 'is_getall': True + }, + # Service registry + { + 'path': '/v1/service_registry/groups', + 'method': 'GET', + 'is_getall': True + }, + { + 'path': '/v1/service_registry/groups/mock_service/members', + 'method': 'GET', + 'is_getall': True } ] diff --git a/st2api/tests/unit/controllers/v1/test_service_registry.py b/st2api/tests/unit/controllers/v1/test_service_registry.py new file mode 100644 index 0000000000..a31f0bd31e --- /dev/null +++ b/st2api/tests/unit/controllers/v1/test_service_registry.py @@ -0,0 +1,85 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from st2common.service_setup import register_service_in_service_registry +from st2common.util import system_info +from st2common.services.coordination import get_member_id +from st2common.services import coordination + +from st2tests import config as tests_config + +from tests.base import FunctionalTest + +__all__ = [ + 'ServiceyRegistryControllerTestCase' +] + + +class ServiceyRegistryControllerTestCase(FunctionalTest): + + coordinator = None + + @classmethod + def setUpClass(cls): + super(ServiceyRegistryControllerTestCase, cls).setUpClass() + + tests_config.parse_args(coordinator_noop=True) + + cls.coordinator = coordination.get_coordinator(use_cache=False) + + # NOTE: We mock call common_setup to emulate service being registered in the service + # registry during bootstrap phase + register_service_in_service_registry(service='mock_service', + capabilities={'key1': 'value1', + 'name': 'mock_service'}, + start_heart=True) + + @classmethod + def tearDownClass(cls): + super(ServiceyRegistryControllerTestCase, cls).tearDownClass() + + coordination.coordinator_teardown(cls.coordinator) + + def test_get_groups(self): + list_resp = self.app.get('/v1/service_registry/groups') + self.assertEqual(list_resp.status_int, 200) + self.assertEqual(list_resp.json, {'groups': ['mock_service']}) + + def test_get_group_members(self): + proc_info = system_info.get_process_info() + member_id = get_member_id() + + # 1. Group doesn't exist + resp = self.app.get('/v1/service_registry/groups/doesnt-exist/members', expect_errors=True) + self.assertEqual(resp.status_int, 404) + self.assertEqual(resp.json['faultstring'], 'Group with ID "doesnt-exist" not found.') + + # 2. Group exists and has a single member + resp = self.app.get('/v1/service_registry/groups/mock_service/members') + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.json, { + 'members': [ + { + 'group_id': 'mock_service', + 'member_id': member_id.decode('utf-8'), + 'capabilities': { + 'key1': 'value1', + 'name': 'mock_service', + 'hostname': proc_info['hostname'], + 'pid': proc_info['pid'] + } + } + ] + }) diff --git a/st2api/tests/unit/controllers/v1/test_service_registry_rbac.py b/st2api/tests/unit/controllers/v1/test_service_registry_rbac.py new file mode 100644 index 0000000000..c83de9749e --- /dev/null +++ b/st2api/tests/unit/controllers/v1/test_service_registry_rbac.py @@ -0,0 +1,91 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from st2common.service_setup import register_service_in_service_registry +from st2common.services import coordination + +from st2tests import config as tests_config + +from tests.base import APIControllerWithRBACTestCase + +http_client = six.moves.http_client + +__all__ = [ + 'ServiceRegistryControllerRBACTestCase' +] + + +class ServiceRegistryControllerRBACTestCase(APIControllerWithRBACTestCase): + + coordinator = None + + @classmethod + def setUpClass(cls): + tests_config.parse_args(coordinator_noop=True) + + super(ServiceRegistryControllerRBACTestCase, cls).setUpClass() + + cls.coordinator = coordination.get_coordinator(use_cache=False) + + # Register mock service in the service registry for testing purposes + register_service_in_service_registry(service='mock_service', + capabilities={'key1': 'value1', + 'name': 'mock_service'}, + start_heart=True) + + @classmethod + def tearDownClass(cls): + super(ServiceRegistryControllerRBACTestCase, cls).tearDownClass() + + coordination.coordinator_teardown(cls.coordinator) + + def test_get_groups(self): + # Non admin users can't access that API endpoint + for user_db in [self.users['no_permissions'], self.users['observer']]: + self.use_user(user_db) + + resp = self.app.get('/v1/service_registry/groups', expect_errors=True) + expected_msg = ('Administrator access required') + self.assertEqual(resp.status_code, http_client.FORBIDDEN) + self.assertEqual(resp.json['faultstring'], expected_msg) + + # Admin user can access it + user_db = self.users['admin'] + self.use_user(user_db) + + resp = self.app.get('/v1/service_registry/groups') + self.assertEqual(resp.status_code, http_client.OK) + self.assertEqual(resp.json, {'groups': ['mock_service']}) + + def test_get_group_members(self): + # Non admin users can't access that API endpoint + for user_db in [self.users['no_permissions'], self.users['observer']]: + self.use_user(user_db) + + resp = self.app.get('/v1/service_registry/groups/mock_service/members', + expect_errors=True) + expected_msg = ('Administrator access required') + self.assertEqual(resp.status_code, http_client.FORBIDDEN) + self.assertEqual(resp.json['faultstring'], expected_msg) + + # Admin user can access it + user_db = self.users['admin'] + self.use_user(user_db) + + resp = self.app.get('/v1/service_registry/groups/mock_service/members') + self.assertEqual(resp.status_code, http_client.OK) + self.assertTrue('members' in resp.json) diff --git a/st2auth/st2auth/app.py b/st2auth/st2auth/app.py index 0dd154bb11..8fd6e6d610 100644 --- a/st2auth/st2auth/app.py +++ b/st2auth/st2auth/app.py @@ -43,15 +43,25 @@ def setup_app(config={}): # including shutdown monkey_patch() + st2auth_config.register_opts() + capabilities = { + 'name': 'auth', + 'listen_host': cfg.CONF.auth.host, + 'listen_port': cfg.CONF.auth.port, + 'listen_ssl': cfg.CONF.auth.use_ssl, + 'type': 'active' + } + # This should be called in gunicorn case because we only want # workers to connect to db, rabbbitmq etc. In standalone HTTP # server case, this setup would have already occurred. - st2auth_config.register_opts() common_setup(service='auth', config=st2auth_config, setup_db=True, register_mq_exchanges=False, register_signal_handlers=True, register_internal_trigger_types=False, run_migrations=False, + service_registry=True, + capabilities=capabilities, config_args=config.get('config_args', None)) # Additional pre-run time checks diff --git a/st2auth/st2auth/cmd/api.py b/st2auth/st2auth/cmd/api.py index 74d1536252..01fee95007 100644 --- a/st2auth/st2auth/cmd/api.py +++ b/st2auth/st2auth/cmd/api.py @@ -41,9 +41,16 @@ def _setup(): + capabilities = { + 'name': 'auth', + 'listen_host': cfg.CONF.auth.host, + 'listen_port': cfg.CONF.auth.port, + 'listen_ssl': cfg.CONF.auth.use_ssl, + 'type': 'active' + } common_setup(service='auth', config=config, setup_db=True, register_mq_exchanges=False, register_signal_handlers=True, register_internal_trigger_types=False, - run_migrations=False) + run_migrations=False, service_registry=True, capabilities=capabilities) # Additional pre-run time checks validate_auth_backend_is_correctly_configured() diff --git a/st2client/st2client/client.py b/st2client/st2client/client.py index a176658056..4297453957 100644 --- a/st2client/st2client/client.py +++ b/st2client/st2client/client.py @@ -34,6 +34,8 @@ from st2client.models.core import WebhookManager from st2client.models.core import StreamManager from st2client.models.core import WorkflowManager +from st2client.models.core import ServiceRegistryGroupsManager +from st2client.models.core import ServiceRegistryMembersManager from st2client.models.core import add_auth_token_to_kwargs_from_env @@ -168,6 +170,15 @@ def __init__(self, base_url=None, auth_url=None, api_url=None, stream_url=None, self.managers['Workflow'] = WorkflowManager( self.endpoints['api'], cacert=self.cacert, debug=self.debug) + # Service Registry + self.managers['ServiceRegistryGroups'] = ServiceRegistryGroupsManager( + models.ServiceRegistryGroup, self.endpoints['api'], cacert=self.cacert, + debug=self.debug) + + self.managers['ServiceRegistryMembers'] = ServiceRegistryMembersManager( + models.ServiceRegistryMember, self.endpoints['api'], cacert=self.cacert, + debug=self.debug) + # RBAC self.managers['Role'] = ResourceManager( models.Role, self.endpoints['api'], cacert=self.cacert, debug=self.debug) diff --git a/st2client/st2client/commands/service_registry.py b/st2client/st2client/commands/service_registry.py new file mode 100644 index 0000000000..02ba52778c --- /dev/null +++ b/st2client/st2client/commands/service_registry.py @@ -0,0 +1,112 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from st2client.models.service_registry import ServiceRegistryGroup +from st2client.models.service_registry import ServiceRegistryMember +from st2client import commands +from st2client.commands.noop import NoopCommand +from st2client.commands import resource + + +class ServiceRegistryBranch(commands.Branch): + def __init__(self, description, app, subparsers, parent_parser=None): + super(ServiceRegistryBranch, self).__init__( + 'service-registry', description, + app, subparsers, parent_parser=parent_parser) + + self.subparsers = self.parser.add_subparsers( + help=('List of commands for managing service registry.')) + + # Instantiate commands + args_groups = ['Manage service registry groups', self.app, self.subparsers] + args_members = ['Manage service registry members', self.app, self.subparsers] + + self.commands['groups'] = ServiceRegistryGroupsBranch(*args_groups) + self.commands['members'] = ServiceRegistryMembersBranch(*args_members) + + +class ServiceRegistryGroupsBranch(resource.ResourceBranch): + def __init__(self, description, app, subparsers, parent_parser=None): + super(ServiceRegistryGroupsBranch, self).__init__( + ServiceRegistryGroup, description, app, subparsers, + parent_parser=parent_parser, + read_only=True, + commands={ + 'list': ServiceRegistryListGroupsCommand, + 'get': NoopCommand + }) + + del self.commands['get'] + + +class ServiceRegistryMembersBranch(resource.ResourceBranch): + def __init__(self, description, app, subparsers, parent_parser=None): + super(ServiceRegistryMembersBranch, self).__init__( + ServiceRegistryMember, description, app, subparsers, + parent_parser=parent_parser, + read_only=True, + commands={ + 'list': ServiceRegistryListMembersCommand, + 'get': NoopCommand + }) + + del self.commands['get'] + + +class ServiceRegistryListGroupsCommand(resource.ResourceListCommand): + display_attributes = ['group_id'] + attribute_display_order = ['group_id'] + + @resource.add_auth_token_to_kwargs_from_cli + def run(self, args, **kwargs): + manager = self.app.client.managers['ServiceRegistryGroups'] + + groups = manager.list() + return groups + + +class ServiceRegistryListMembersCommand(resource.ResourceListCommand): + display_attributes = ['group_id', 'member_id', 'capabilities'] + attribute_display_order = ['group_id', 'member_id', 'capabilities'] + + def __init__(self, resource, *args, **kwargs): + super(ServiceRegistryListMembersCommand, self).__init__( + resource, *args, **kwargs + ) + + self.parser.add_argument('--group-id', dest='group_id', default=None, + help='If provided only retrieve members for the specified group.') + + @resource.add_auth_token_to_kwargs_from_cli + def run(self, args, **kwargs): + groups_manager = self.app.client.managers['ServiceRegistryGroups'] + members_manager = self.app.client.managers['ServiceRegistryMembers'] + + # If group ID is provided only retrieve members for that group, otherwise retrieve members + # for all groups + if args.group_id: + members = members_manager.list(args.group_id) + return members + else: + groups = groups_manager.list() + + result = [] + for group in groups: + members = members_manager.list(group.group_id) + result.extend(members) + + return result diff --git a/st2client/st2client/models/__init__.py b/st2client/st2client/models/__init__.py index dabce91510..0ce9c7b4c6 100644 --- a/st2client/st2client/models/__init__.py +++ b/st2client/st2client/models/__init__.py @@ -29,4 +29,5 @@ from st2client.models.trace import * # noqa from st2client.models.webhook import * # noqa from st2client.models.timer import * # noqa +from st2client.models.service_registry import * # noqa from st2client.models.rbac import * # noqa diff --git a/st2client/st2client/models/core.py b/st2client/st2client/models/core.py index b6c922ef8b..264e3159d4 100644 --- a/st2client/st2client/models/core.py +++ b/st2client/st2client/models/core.py @@ -682,3 +682,51 @@ def inspect(self, definition, **kwargs): self.handle_error(response) return response.json() + + +class ServiceRegistryGroupsManager(ResourceManager): + @add_auth_token_to_kwargs_from_env + def list(self, **kwargs): + url = '/service_registry/groups' + + headers = {} + response = self.client.get(url, headers=headers, **kwargs) + + if response.status_code != http_client.OK: + self.handle_error(response) + + groups = response.json()['groups'] + + result = [] + for group in groups: + item = self.resource.deserialize({'group_id': group}) + result.append(item) + + return result + + +class ServiceRegistryMembersManager(ResourceManager): + + @add_auth_token_to_kwargs_from_env + def list(self, group_id, **kwargs): + url = '/service_registry/groups/%s/members' % (group_id) + + headers = {} + response = self.client.get(url, headers=headers, **kwargs) + + if response.status_code != http_client.OK: + self.handle_error(response) + + members = response.json()['members'] + + result = [] + for member in members: + data = { + 'group_id': group_id, + 'member_id': member['member_id'], + 'capabilities': member['capabilities'] + } + item = self.resource.deserialize(data) + result.append(item) + + return result diff --git a/st2client/st2client/models/service_registry.py b/st2client/st2client/models/service_registry.py new file mode 100644 index 0000000000..6cf7206de0 --- /dev/null +++ b/st2client/st2client/models/service_registry.py @@ -0,0 +1,48 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from st2client.models import core + +__all__ = [ + 'ServiceRegistry', + + 'ServiceRegistryGroup', + 'ServiceRegistryMember' +] + + +class ServiceRegistry(core.Resource): + _alias = 'service-registry' + _display_name = 'Service Registry' + _plural = 'Service Registry' + _plural_display_name = 'Service Registry' + + +class ServiceRegistryGroup(core.Resource): + _alias = 'group' + _display_name = 'Group' + _plural = 'Groups' + _plural_display_name = 'Groups' + _repr_attributes = ['group_id'] + + +class ServiceRegistryMember(core.Resource): + _alias = 'member' + _display_name = 'Group Member' + _plural = 'Group Members' + _plural_display_name = 'Group Members' + _repr_attributes = ['group_id', 'member_id'] diff --git a/st2client/st2client/shell.py b/st2client/st2client/shell.py index fdf0a0c160..2bf62d9ed8 100755 --- a/st2client/st2client/shell.py +++ b/st2client/st2client/shell.py @@ -58,6 +58,7 @@ from st2client.commands import rule_enforcement from st2client.commands import rbac from st2client.commands import workflow +from st2client.commands import service_registry from st2client.config import set_config from st2client.exceptions.operations import OperationFailureException from st2client.utils.logging import LogLevelFilter, set_log_level_for_all_loggers @@ -338,6 +339,11 @@ def __init__(self): 'Only orquesta workflows are supported.', self, self.subparsers) + # Service Registry + self.commands['service-registry'] = service_registry.ServiceRegistryBranch( + 'Service registry group and membership related commands.', + self, self.subparsers) + # RBAC self.commands['role'] = rbac.RoleBranch( 'RBAC roles.', diff --git a/st2common/st2common/openapi.yaml b/st2common/st2common/openapi.yaml index 82e914f8bf..aa92aace65 100644 --- a/st2common/st2common/openapi.yaml +++ b/st2common/st2common/openapi.yaml @@ -4300,6 +4300,49 @@ paths: description: Unexpected error schema: $ref: '#/definitions/Error' + /api/v1/service_registry/groups: + get: + operationId: st2api.controllers.v1.service_registry:groups_controller.get_all + description: Retrieve available service registry groups. + x-parameters: + - name: user + in: context + x-as: requester_user + description: User performing the operation. + responses: + '200': + description: List of available groups. + schema: + type: object + default: + description: Unexpected error + schema: + $ref: '#/definitions/Error' + /api/v1/service_registry/groups/{group_id}/members: + get: + operationId: st2api.controllers.v1.service_registry:members_controller.get_one + description: Retrieve active / available members for a particular group. + parameters: + - name: group_id + in: path + description: Group ID. + type: string + required: true + x-parameters: + - name: user + in: context + x-as: requester_user + description: User performing the operation. + responses: + '200': + description: List of available members. + schema: + type: object + default: + description: Unexpected error + schema: + $ref: '#/definitions/Error' + /auth/v1/tokens: post: diff --git a/st2common/st2common/openapi.yaml.j2 b/st2common/st2common/openapi.yaml.j2 index b7f1d56768..b3b4592fc1 100644 --- a/st2common/st2common/openapi.yaml.j2 +++ b/st2common/st2common/openapi.yaml.j2 @@ -4296,6 +4296,49 @@ paths: description: Unexpected error schema: $ref: '#/definitions/Error' + /api/v1/service_registry/groups: + get: + operationId: st2api.controllers.v1.service_registry:groups_controller.get_all + description: Retrieve available service registry groups. + x-parameters: + - name: user + in: context + x-as: requester_user + description: User performing the operation. + responses: + '200': + description: List of available groups. + schema: + type: object + default: + description: Unexpected error + schema: + $ref: '#/definitions/Error' + /api/v1/service_registry/groups/{group_id}/members: + get: + operationId: st2api.controllers.v1.service_registry:members_controller.get_one + description: Retrieve active / available members for a particular group. + parameters: + - name: group_id + in: path + description: Group ID. + type: string + required: true + x-parameters: + - name: user + in: context + x-as: requester_user + description: User performing the operation. + responses: + '200': + description: List of available members. + schema: + type: object + default: + description: Unexpected error + schema: + $ref: '#/definitions/Error' + /auth/v1/tokens: post: diff --git a/st2common/st2common/policies/concurrency.py b/st2common/st2common/policies/concurrency.py index 883721b628..52289f002d 100644 --- a/st2common/st2common/policies/concurrency.py +++ b/st2common/st2common/policies/concurrency.py @@ -30,7 +30,7 @@ def __init__(self, policy_ref, policy_type, threshold=0, action='delay'): self.threshold = threshold self.policy_action = action - self.coordinator = coordination.get_coordinator() + self.coordinator = coordination.get_coordinator(start_heart=True) def _get_status_for_policy_action(self, action): if action == 'delay': diff --git a/st2common/st2common/service_setup.py b/st2common/st2common/service_setup.py index 90717798f6..3c1002a0db 100644 --- a/st2common/st2common/service_setup.py +++ b/st2common/st2common/service_setup.py @@ -26,6 +26,7 @@ import six from oslo_config import cfg +from tooz.coordination import GroupAlreadyExist from st2common import log as logging from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH @@ -38,6 +39,8 @@ from st2common import triggers from st2common.rbac.migrations import run_all as run_all_rbac_migrations from st2common.logging.filters import LogLevelFilter +from st2common.util import system_info +from st2common.services import coordination from st2common.logging.misc import add_global_filters_for_all_loggers # Note: This is here for backward compatibility. @@ -53,7 +56,9 @@ 'teardown', 'db_setup', - 'db_teardown' + 'db_teardown', + + 'register_service_in_service_registry' ] LOG = logging.getLogger(__name__) @@ -61,7 +66,8 @@ def setup(service, config, setup_db=True, register_mq_exchanges=True, register_signal_handlers=True, register_internal_trigger_types=False, - run_migrations=True, register_runners=True, config_args=None): + run_migrations=True, register_runners=True, service_registry=False, + capabilities=None, config_args=None): """ Common setup function. @@ -75,10 +81,13 @@ def setup(service, config, setup_db=True, register_mq_exchanges=True, 5. Registers common signal handlers 6. Register internal trigger types 7. Register all the runners which are installed inside StackStorm virtualenv. + 8. Register service in the service registry with the provided capabilities :param service: Name of the service. :param config: Config object to use to parse args. """ + capabilities = capabilities or {} + # Set up logger which logs everything which happens during and before config # parsing to sys.stdout logging.setup(DEFAULT_LOGGING_CONF_PATH, excludes=None) @@ -171,9 +180,57 @@ def setup(service, config, setup_db=True, register_mq_exchanges=True, metrics_initialize() + # Register service in the service registry + if service_registry: + # NOTE: It's important that we pass start_heart=True to start the hearbeat process + register_service_in_service_registry(service=service, capabilities=capabilities, + start_heart=True) + def teardown(): """ Common teardown function. """ + # 1. Tear down the database db_teardown() + + # 2. Tear down the coordinator + coordinator = coordination.get_coordinator_if_set() + coordination.coordinator_teardown(coordinator) + + +def register_service_in_service_registry(service, capabilities=None, start_heart=True): + """ + Register provided service in the service registry and start the heartbeat process. + + :param service: Service name which will also be used for a group name (e.g. "api"). + :type service: ``str`` + + :param capabilities: Optional metadata associated with the service. + :type capabilities: ``dict`` + """ + # NOTE: It's important that we pass start_heart=True to start the hearbeat process + coordinator = coordination.get_coordinator(start_heart=start_heart) + + member_id = coordination.get_member_id() + + # 1. Create a group with the name of the service + if not isinstance(service, six.binary_type): + group_id = service.encode('utf-8') + else: + group_id = service + + try: + coordinator.create_group(group_id).get() + except GroupAlreadyExist: + pass + + # Include common capabilities such as hostname and process ID + proc_info = system_info.get_process_info() + capabilities['hostname'] = proc_info['hostname'] + capabilities['pid'] = proc_info['pid'] + + # 1. Join the group as a member + LOG.debug('Joining service registry group "%s" as member_id "%s" with capabilities "%s"' % + (group_id, member_id, capabilities)) + return coordinator.join_group(group_id, capabilities=capabilities).get() diff --git a/st2common/st2common/services/coordination.py b/st2common/st2common/services/coordination.py index e7dcd41937..477330bdda 100644 --- a/st2common/st2common/services/coordination.py +++ b/st2common/st2common/services/coordination.py @@ -16,9 +16,11 @@ from __future__ import absolute_import import six + from oslo_config import cfg from tooz import coordination from tooz import locking +from tooz.coordination import GroupNotCreated from st2common import log as logging from st2common.util import system_info @@ -30,7 +32,10 @@ __all__ = [ 'configured', + 'get_coordinator', + 'get_coordinator_if_set', + 'get_member_id', 'coordinator_setup', 'coordinator_teardown' @@ -51,6 +56,18 @@ def heartbeat(self): return True +class NoOpAsyncResult(object): + """ + In most scenarios, tooz library returns an async result, a future and this + class wrapper is here to correctly mimic tooz API and behavior. + """ + def __init__(self, result=None): + self._result = result + + def get(self): + return self._result + + class NoOpDriver(coordination.CoordinationDriver): """ Tooz driver where each operation is a no-op. @@ -58,9 +75,15 @@ class NoOpDriver(coordination.CoordinationDriver): This driver is used if coordination service is not configured. """ + groups = {} + def __init__(self, member_id, parsed_url=None, options=None): super(NoOpDriver, self).__init__(member_id, parsed_url, options) + @classmethod + def stop(cls): + cls.groups = {} + def watch_join_group(self, group_id, callback): self._hooks_join_group[group_id].append(callback) @@ -83,33 +106,47 @@ def unwatch_elected_as_leader(self, group_id, callback): def stand_down_group_leader(group_id): return None - @staticmethod - def create_group(group_id): - return None + @classmethod + def create_group(cls, group_id): + cls.groups[group_id] = {'members': {}} + return NoOpAsyncResult() - @staticmethod - def get_groups(): - return None + @classmethod + def get_groups(cls): + return NoOpAsyncResult(result=cls.groups.keys()) - @staticmethod - def join_group(group_id, capabilities=''): - return None + @classmethod + def join_group(cls, group_id, capabilities=''): + member_id = get_member_id() - @staticmethod - def leave_group(group_id): - return None + cls.groups[group_id]['members'][member_id] = {'capabilities': capabilities} + return NoOpAsyncResult() - @staticmethod - def delete_group(group_id): - return None + @classmethod + def leave_group(cls, group_id): + member_id = get_member_id() - @staticmethod - def get_members(group_id): - return None + del cls.groups[group_id]['members'][member_id] + return NoOpAsyncResult() - @staticmethod - def get_member_capabilities(group_id, member_id): - return None + @classmethod + def delete_group(cls, group_id): + del cls.groups[group_id] + return NoOpAsyncResult() + + @classmethod + def get_members(cls, group_id): + try: + member_ids = cls.groups[group_id]['members'].keys() + except KeyError: + raise GroupNotCreated('Group doesnt exist') + + return NoOpAsyncResult(result=member_ids) + + @classmethod + def get_member_capabilities(cls, group_id, member_id): + member_capabiliteis = cls.groups[group_id]['members'][member_id]['capabilities'] + return NoOpAsyncResult(result=member_capabiliteis) @staticmethod def update_capabilities(group_id, capabilities): @@ -137,7 +174,7 @@ def configured(): return backend_configured and not mock_backend -def coordinator_setup(): +def coordinator_setup(start_heart=True): """ Sets up the client for the coordination service. @@ -149,8 +186,7 @@ def coordinator_setup(): """ url = cfg.CONF.coordination.url lock_timeout = cfg.CONF.coordination.lock_timeout - proc_info = system_info.get_process_info() - member_id = six.b('%s_%d' % (proc_info['hostname'], proc_info['pid'])) + member_id = get_member_id() if url: coordinator = coordination.get_coordinator(url, member_id, lock_timeout=lock_timeout) @@ -161,22 +197,55 @@ def coordinator_setup(): # to work coordinator = NoOpDriver(member_id) - coordinator.start() + coordinator.start(start_heart=start_heart) return coordinator -def coordinator_teardown(coordinator): - coordinator.stop() +def coordinator_teardown(coordinator=None): + if coordinator: + coordinator.stop() + +def get_coordinator(start_heart=True, use_cache=True): + """ + :param start_heart: True to start heartbeating process. + :type start_heart: ``bool`` -def get_coordinator(): + :param use_cache: True to use cached coordinator instance. False should only be used in tests. + :type use_cache: ``bool`` + """ global COORDINATOR if not configured(): LOG.warn('Coordination backend is not configured. Code paths which use coordination ' 'service will use best effort approach and race conditions are possible.') + if not use_cache: + return coordinator_setup(start_heart=start_heart) + if not COORDINATOR: - COORDINATOR = coordinator_setup() + COORDINATOR = coordinator_setup(start_heart=start_heart) + LOG.debug('Initializing and caching new coordinator instance: %s' % (str(COORDINATOR))) + else: + LOG.debug('Using cached coordinator instance: %s' % (str(COORDINATOR))) return COORDINATOR + + +def get_coordinator_if_set(): + """ + Return a coordinator instance if one has been initialized, None otherwise. + """ + global COORDINATOR + return COORDINATOR + + +def get_member_id(): + """ + Retrieve member if for the current process. + + :rtype: ``bytes`` + """ + proc_info = system_info.get_process_info() + member_id = six.b('%s_%d' % (proc_info['hostname'], proc_info['pid'])) + return member_id diff --git a/st2common/st2common/services/workflows.py b/st2common/st2common/services/workflows.py index f69c963f91..ddc8128ee1 100644 --- a/st2common/st2common/services/workflows.py +++ b/st2common/st2common/services/workflows.py @@ -727,7 +727,7 @@ def handle_action_execution_completion(ac_ex_db): task_ex_id = ac_ex_db.context['orquesta']['task_execution_id'] # Acquire lock before write operations. - with coord_svc.get_coordinator().get_lock(wf_ex_id): + with coord_svc.get_coordinator(start_heart=True).get_lock(wf_ex_id): # Get execution records for logging purposes. wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id) task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id) diff --git a/st2common/tests/unit/services/test_synchronization.py b/st2common/tests/unit/services/test_synchronization.py index 5337860ba8..a68adf9c51 100644 --- a/st2common/tests/unit/services/test_synchronization.py +++ b/st2common/tests/unit/services/test_synchronization.py @@ -29,8 +29,8 @@ class SynchronizationTest(unittest2.TestCase): @classmethod def setUpClass(cls): super(SynchronizationTest, cls).setUpClass() - tests_config.parse_args() - cls.coordinator = coordination.get_coordinator() + tests_config.parse_args(coordinator_noop=False) + cls.coordinator = coordination.get_coordinator(use_cache=False) @classmethod def tearDownClass(cls): diff --git a/st2reactor/st2reactor/cmd/garbagecollector.py b/st2reactor/st2reactor/cmd/garbagecollector.py index b425172e33..3e42c0b629 100644 --- a/st2reactor/st2reactor/cmd/garbagecollector.py +++ b/st2reactor/st2reactor/cmd/garbagecollector.py @@ -14,6 +14,7 @@ # limitations under the License. from __future__ import absolute_import + import os import sys @@ -40,9 +41,14 @@ def _setup(): + capabilities = { + 'name': 'garbagecollector', + 'type': 'passive' + } common_setup(service='garbagecollector', config=config, setup_db=True, register_mq_exchanges=True, register_signal_handlers=True, - register_runners=False) + register_runners=False, service_registry=True, + capabilities=capabilities) def _teardown(): diff --git a/st2reactor/st2reactor/cmd/rulesengine.py b/st2reactor/st2reactor/cmd/rulesengine.py index 22ae356648..a8a9c2347c 100644 --- a/st2reactor/st2reactor/cmd/rulesengine.py +++ b/st2reactor/st2reactor/cmd/rulesengine.py @@ -33,9 +33,13 @@ def _setup(): + capabilities = { + 'name': 'rulesengine', + 'type': 'passive' + } common_setup(service='rulesengine', config=config, setup_db=True, register_mq_exchanges=True, register_signal_handlers=True, register_internal_trigger_types=True, - register_runners=False) + register_runners=False, service_registry=True, capabilities=capabilities) def _teardown(): diff --git a/st2reactor/st2reactor/cmd/sensormanager.py b/st2reactor/st2reactor/cmd/sensormanager.py index 0fbada701e..d066cff228 100644 --- a/st2reactor/st2reactor/cmd/sensormanager.py +++ b/st2reactor/st2reactor/cmd/sensormanager.py @@ -42,9 +42,13 @@ def _setup(): + capabilities = { + 'name': 'sensorcontainer', + 'type': 'passive' + } common_setup(service='sensorcontainer', config=config, setup_db=True, register_mq_exchanges=True, register_signal_handlers=True, - register_runners=False) + register_runners=False, service_registry=True, capabilities=capabilities) def _teardown(): diff --git a/st2reactor/st2reactor/cmd/timersengine.py b/st2reactor/st2reactor/cmd/timersengine.py index acf1e6932e..998eeaecc0 100644 --- a/st2reactor/st2reactor/cmd/timersengine.py +++ b/st2reactor/st2reactor/cmd/timersengine.py @@ -36,8 +36,12 @@ def _setup(): + capabilities = { + 'name': 'timerengine', + 'type': 'passive' + } common_setup(service='timer_engine', config=config, setup_db=True, register_mq_exchanges=True, - register_signal_handlers=True) + register_signal_handlers=True, service_registry=True, capabilities=capabilities) def _teardown(): diff --git a/st2stream/st2stream/app.py b/st2stream/st2stream/app.py index 588aaf026a..83579ec32e 100644 --- a/st2stream/st2stream/app.py +++ b/st2stream/st2stream/app.py @@ -53,6 +53,12 @@ def setup_app(config={}): monkey_patch() st2stream_config.register_opts() + capabilities = { + 'name': 'stream', + 'listen_host': cfg.CONF.stream.host, + 'listen_port': cfg.CONF.stream.port, + 'type': 'active' + } # This should be called in gunicorn case because we only want # workers to connect to db, rabbbitmq etc. In standalone HTTP # server case, this setup would have already occurred. @@ -61,6 +67,8 @@ def setup_app(config={}): register_signal_handlers=True, register_internal_trigger_types=False, run_migrations=False, + service_registry=True, + capabilities=capabilities, config_args=config.get('config_args', None)) router = Router(debug=cfg.CONF.stream.debug, auth=cfg.CONF.auth.enable, diff --git a/st2stream/st2stream/cmd/api.py b/st2stream/st2stream/cmd/api.py index 55e48f4648..ab9741de3c 100644 --- a/st2stream/st2stream/cmd/api.py +++ b/st2stream/st2stream/cmd/api.py @@ -49,9 +49,15 @@ def _setup(): + capabilities = { + 'name': 'stream', + 'listen_host': cfg.CONF.stream.host, + 'listen_port': cfg.CONF.stream.port, + 'type': 'active' + } common_setup(service='stream', config=config, setup_db=True, register_mq_exchanges=True, register_signal_handlers=True, register_internal_trigger_types=False, - run_migrations=False) + run_migrations=False, service_registry=True, capabilities=capabilities) def _run_server(): diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index c8cda2cd86..3c1f07b263 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -28,12 +28,12 @@ LOG = logging.getLogger(__name__) -def parse_args(coordinator_noop=False): +def parse_args(coordinator_noop=True): _setup_config_opts(coordinator_noop=coordinator_noop) CONF(args=[]) -def _setup_config_opts(coordinator_noop=False): +def _setup_config_opts(coordinator_noop=True): cfg.CONF.reset() try: diff --git a/tools/launchdev.sh b/tools/launchdev.sh index 1f37336ea8..7bd5f64df4 100755 --- a/tools/launchdev.sh +++ b/tools/launchdev.sh @@ -68,6 +68,7 @@ function init(){ fi VIRTUALENV=${VIRTUALENV_DIR:-${ST2_REPO}/virtualenv} + VIRTUALENV=$(readlink -f ${VIRTUALENV}) PY=${VIRTUALENV}/bin/python PYTHON_VERSION=$(${PY} --version 2>&1) @@ -77,6 +78,8 @@ function init(){ if [ -z "$ST2_CONF" ]; then ST2_CONF=${ST2_REPO}/conf/st2.dev.conf fi + + ST2_CONF=$(readlink -f ${ST2_CONF}) echo "Using st2 config file: $ST2_CONF" if [ ! -f "$ST2_CONF" ]; then diff --git a/tools/list_group_members.py b/tools/list_group_members.py new file mode 100755 index 0000000000..c5db38acea --- /dev/null +++ b/tools/list_group_members.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from oslo_config import cfg + +from st2common import config +from st2common.services import coordination + +""" +Tool which lists all the services registered in the service registry and their capabilities. +""" + + +def main(group_id=None): + coordinator = coordination.get_coordinator() + + if not group_id: + group_ids = list(coordinator.get_groups().get()) + group_ids = [item.decode('utf-8') for item in group_ids] + + print('Available groups (%s):' % (len(group_ids))) + for group_id in group_ids: + print(' - %s' % (group_id)) + print('') + else: + group_ids = [group_id] + + for group_id in group_ids: + member_ids = list(coordinator.get_members(group_id).get()) + member_ids = [member_id.decode('utf-8') for member_id in member_ids] + + print('Members in group "%s" (%s):' % (group_id, len(member_ids))) + + for member_id in member_ids: + capabilities = coordinator.get_member_capabilities(group_id, member_id).get() + print(' - %s (capabilities=%s)' % (member_id, str(capabilities))) + + +def do_register_cli_opts(opts, ignore_errors=False): + for opt in opts: + try: + cfg.CONF.register_cli_opt(opt) + except: + if not ignore_errors: + raise + + +if __name__ == '__main__': + cli_opts = [ + cfg.StrOpt('group-id', default=None, + help='If provided, only list members for that group.'), + + ] + do_register_cli_opts(cli_opts) + config.parse_args() + + main(group_id=cfg.CONF.group_id)