From c7e8ce664ae4fa104b415dcef66fd8810cf8160a Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Fri, 11 Oct 2019 12:23:35 +0300 Subject: [PATCH 1/8] Multiaccount SQS sensor capability --- aws.yaml.example | 11 +- config.schema.yaml | 7 +- sensors/sqs_sensor.py | 157 ++++++++++++++++----- sensors/sqs_sensor.yaml | 4 + tests/fixtures/blank.yaml | 2 +- tests/fixtures/full.yaml | 3 +- tests/fixtures/mixed.yaml | 20 +++ tests/fixtures/multiaccount.yaml | 19 +++ tests/test_sensor_sqs.py | 227 ++++++++++++++++++++++++++++--- 9 files changed, 393 insertions(+), 57 deletions(-) create mode 100644 tests/fixtures/mixed.yaml create mode 100644 tests/fixtures/multiaccount.yaml diff --git a/aws.yaml.example b/aws.yaml.example index 44d46431..0b0a43e4 100755 --- a/aws.yaml.example +++ b/aws.yaml.example @@ -11,8 +11,17 @@ service_notifications_sensor: path: /my-path sqs_sensor: + roles_arns: + - arn:aws:iam::123456789098:role/rolename1 + - arn:aws:iam::901234567812:role/rolename2 + - arn:aws:iam::567890123489:role/rolename3 input_queues: - - queuename + - queue_name_1 + - https://sqs.us-east-1.amazonaws.com/567890123489/queue_name_2 + - https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3 + - https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_4 + - https://sqs.eu-west-1.amazonaws.com/901234567812/queue_name_5 + - https://sqs.sa-east-1.amazonaws.com/567890123489/queue_name_6 sqs_other: max_number_of_messages: 1 diff --git a/config.schema.yaml b/config.schema.yaml index 835a9387..51c3dda9 100755 --- a/config.schema.yaml +++ b/config.schema.yaml @@ -41,10 +41,15 @@ type: object properties: input_queues: - description: "Names of queue to fetch messages from Amazon SQS" + description: "Names or URLs of queues to fetch messages from Amazon SQS" type: "array" items: type: "string" + roles_arns: + type: "array" + description: "ARNs of the roles which allow queues to be fetched for messages" + items: + type: "string" sqs_other: type: object properties: diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index 77b5fcb0..ed2ce997 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -38,6 +38,8 @@ import six import json + +from collections import defaultdict from boto3.session import Session from botocore.exceptions import ClientError from botocore.exceptions import NoRegionError @@ -55,19 +57,25 @@ def __init__(self, sensor_service, config=None, poll_interval=5): def setup(self): self._logger = self._sensor_service.get_logger(name=self.__class__.__name__) - self.session = None - self.sqs_res = None + self.account_id = None + self.credentials = {} + self.sessions = {} + self.sqs_res = defaultdict(dict) def poll(self): # setting SQS ServiceResource object from the parameter of datastore or configuration file self._may_setup_sqs() for queue in self.input_queues: - msgs = self._receive_messages(queue=self._get_queue_by_name(queue), + account_id, region = self._get_info(queue) + msgs = self._receive_messages(queue=self._get_queue(queue, account_id, region), num_messages=self.max_number_of_messages) for msg in msgs: if msg: - payload = {"queue": queue, "body": json.loads(msg.body)} + payload = {"queue": queue, + "account_id": account_id, + "region": region, + "body": json.loads(msg.body)} self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload) msg.delete() @@ -89,7 +97,7 @@ def _get_config_entry(self, key, prefix=None): ''' Get configuration values either from Datastore or config file. ''' config = self.config if prefix: - config = self._config.get(prefix, {}) + config = self.config.get(prefix, {}) value = self._sensor_service.get_value('aws.%s' % (key), local=False) if not value: @@ -101,8 +109,16 @@ def _get_config_entry(self, key, prefix=None): return value def _may_setup_sqs(self): - queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor') + self.access_key_id = self._get_config_entry('aws_access_key_id') + self.secret_access_key = self._get_config_entry('aws_secret_access_key') + self.aws_region = self._get_config_entry('region') + self.max_number_of_messages = self._get_config_entry('max_number_of_messages', + prefix='sqs_other') + if not self.account_id: + self._setup_session() + + queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor') # XXX: This is a hack as from datastore we can only receive a string while # from config.yaml we can receive a list if isinstance(queues, six.string_types): @@ -112,50 +128,117 @@ def _may_setup_sqs(self): else: self.input_queues = [] - self.aws_access_key = self._get_config_entry('aws_access_key_id') - self.aws_secret_key = self._get_config_entry('aws_secret_access_key') - self.aws_region = self._get_config_entry('region') - - self.max_number_of_messages = self._get_config_entry('max_number_of_messages', - prefix='sqs_other') - # checker configuration is update, or not - def _is_same_credentials(): - c = self.session.get_credentials() - return c is not None and \ - c.access_key == self.aws_access_key and \ - c.secret_key == self.aws_secret_key and \ - self.session.region_name == self.aws_region + def _is_same_credentials(session, account_id): + if not session: + return False - if self.session is None or not _is_same_credentials(): - self._setup_sqs() - - def _setup_sqs(self): - ''' Setup Boto3 structures ''' - self._logger.debug('Setting up SQS resources') - self.session = Session(aws_access_key_id=self.aws_access_key, - aws_secret_access_key=self.aws_secret_key, - region_name=self.aws_region) + c = session.get_credentials() + return c is not None and \ + c.access_key == self.credentials[account_id][0] and \ + c.secret_key == self.credentials[account_id][1] and \ + (account_id == self.account_id or c.token == self.credentials[account_id][2]) + + # build a map between 'account_id' and its 'role arn' by parsing the matching config entry + cross_roles_arns = { + arn.split(':')[4]: arn + for arn in self._get_config_entry('roles_arns', 'sqs_sensor') or [] + } + required_accounts = {self._get_info(queue)[0] for queue in self.input_queues} + + for account_id in required_accounts: + if account_id != self.account_id and account_id not in cross_roles_arns: + continue + + session = self.sessions.get(account_id) + if not _is_same_credentials(session, account_id): + if account_id == self.account_id: + self._setup_session() + else: + self._setup_multiaccount_session(account_id, cross_roles_arns) + + def _setup_session(self): + ''' Setup Boto3 session ''' + session = Session(aws_access_key_id=self.access_key_id, + aws_secret_access_key=self.secret_access_key) + + if not self.account_id: + self.account_id = session.client('sts').get_caller_identity().get('Account') + self.credentials[self.account_id] = (self.access_key_id, self.secret_access_key, None) + + self.sessions[self.account_id] = session + self.sqs_res.pop(self.account_id, None) + + def _setup_multiaccount_session(self, account_id, cross_roles_arns): + ''' Assume role and setup session for the cross-account capability''' + try: + assumed_role = self.sessions[self.account_id].client('sts').assume_role( + RoleArn=cross_roles_arns[account_id], + RoleSessionName='StackStormEvents' + ) + except ClientError: + self._logger.error('Could not assume role on %s', account_id) + return + + self.credentials[account_id] = (assumed_role["Credentials"]["AccessKeyId"], + assumed_role["Credentials"]["SecretAccessKey"], + assumed_role["Credentials"]["SessionToken"]) + + session = Session( + aws_access_key_id=self.credentials[account_id][0], + aws_secret_access_key=self.credentials[account_id][1], + aws_session_token=self.credentials[account_id][2] + ) + self.sessions[account_id] = session + self.sqs_res.pop(account_id, None) + + def _setup_sqs(self, session, account_id, region): + ''' Setup SQS resources''' + if region in self.sqs_res[account_id]: + return self.sqs_res[account_id][region] try: - self.sqs_res = self.session.resource('sqs') + self.sqs_res[account_id][region] = session.resource('sqs', region_name=region) + return self.sqs_res[account_id][region] except NoRegionError: - self._logger.warning("The specified region '%s' is invalid", self.aws_region) + self._logger.error("The specified region '%s' for account %s is invalid.", + region, account_id) + + def _check_queue_if_url(self, queue): + return queue.startswith('http://') or queue.startswith('https://') + + def _get_info(self, queue): + ''' Retrieve the account ID and region from the queue URL ''' + if self._check_queue_if_url(queue): + return queue.split('/')[3], queue.split('.')[1] + return self.account_id, self.aws_region + + def _get_queue(self, queue, account_id, region): + ''' Fetch QUEUE by its name or URL and create new one if queue doesn't exist ''' + if account_id not in self.sessions: + self._logger.error('Session for account id %s does not exist', account_id) + return None + + sqs_res = self._setup_sqs(self.sessions[account_id], account_id, region) + if sqs_res is None: + return None - def _get_queue_by_name(self, queueName): - ''' Fetch QUEUE by it's name create new one if queue doesn't exist ''' try: - return self.sqs_res.get_queue_by_name(QueueName=queueName) + if self._check_queue_if_url(queue): + return sqs_res.Queue(queue) + return sqs_res.get_queue_by_name(QueueName=queue) except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': - self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName) - return self.sqs_res.create_queue(QueueName=queueName) + self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queue) + if self._check_queue_if_url(queue): + return sqs_res.create_queue(QueueName=queue.split('/')[4]) + return sqs_res.create_queue(QueueName=queue) elif e.response['Error']['Code'] == 'InvalidClientTokenId': - self._logger.warning("Cloudn't operate sqs because of invalid credential config") + self._logger.warning("Couldn't operate sqs because of invalid credential config") else: raise except NoCredentialsError: - self._logger.warning("Cloudn't operate sqs because of invalid credential config") + self._logger.warning("Couldn't operate sqs because of invalid credential config") except EndpointConnectionError as e: self._logger.warning(e) diff --git a/sensors/sqs_sensor.yaml b/sensors/sqs_sensor.yaml index 2bb4ca34..243186d5 100755 --- a/sensors/sqs_sensor.yaml +++ b/sensors/sqs_sensor.yaml @@ -12,5 +12,9 @@ properties: queue: type: "string" + account_id: + type: "string" + region: + type: "string" body: type: "object" diff --git a/tests/fixtures/blank.yaml b/tests/fixtures/blank.yaml index ed97d539..73b314ff 100644 --- a/tests/fixtures/blank.yaml +++ b/tests/fixtures/blank.yaml @@ -1 +1 @@ ---- +--- \ No newline at end of file diff --git a/tests/fixtures/full.yaml b/tests/fixtures/full.yaml index 8b8547e2..ffbaefde 100755 --- a/tests/fixtures/full.yaml +++ b/tests/fixtures/full.yaml @@ -10,7 +10,8 @@ service_notifications_sensor: path: "/my-path" sqs_sensor: - input_queues: "input_queue" + input_queues: + - "input_queue" sqs_other: max_number_of_messages: 1 diff --git a/tests/fixtures/mixed.yaml b/tests/fixtures/mixed.yaml new file mode 100644 index 00000000..7a22a5de --- /dev/null +++ b/tests/fixtures/mixed.yaml @@ -0,0 +1,20 @@ +--- +aws_access_key_id: "access_key_id" +aws_secret_access_key: "secret_key" +region: "us-west-1" +st2_user_data: "" + +service_notifications_sensor: + host: "localhost" + port: 12345 + path: "/my-path" + +sqs_sensor: + roles_arns: + - "arn:aws:iam::345678901223:role/rolename1" + input_queues: + - "input_queue" + - "https://sqs.us-east-1.amazonaws.com/345678901223/queue_name_2" + +sqs_other: + max_number_of_messages: 1 diff --git a/tests/fixtures/multiaccount.yaml b/tests/fixtures/multiaccount.yaml new file mode 100644 index 00000000..1d76b78f --- /dev/null +++ b/tests/fixtures/multiaccount.yaml @@ -0,0 +1,19 @@ +--- +aws_access_key_id: "access_key_id" +aws_secret_access_key: "secret_key" +region: "us-west-1" +st2_user_data: "" + +service_notifications_sensor: + host: "localhost" + port: 12345 + path: "/my-path" + +sqs_sensor: + roles_arns: + - "arn:aws:iam::345678901223:role/rolename1" + input_queues: + - "https://sqs.us-east-1.amazonaws.com/345678901223/queue_name_2" + +sqs_other: + max_number_of_messages: 1 diff --git a/tests/test_sensor_sqs.py b/tests/test_sensor_sqs.py index 1e62f61c..7692fb11 100644 --- a/tests/test_sensor_sqs.py +++ b/tests/test_sensor_sqs.py @@ -4,6 +4,7 @@ from boto3.session import Session from botocore.exceptions import ClientError from botocore.exceptions import NoCredentialsError +from botocore.exceptions import NoRegionError from botocore.exceptions import EndpointConnectionError from st2tests.base import BaseSensorTestCase @@ -20,6 +21,24 @@ def __init__(self, msgs=[]): def get_queue_by_name(self, **kwargs): return SQSSensorTestCase.MockQueue(self.msgs) + def Queue(self, queue): + return SQSSensorTestCase.MockQueue(self.msgs) + + class MockResourceNonExistentQueue(object): + def __init__(self, msgs=[]): + self.msgs = msgs + + def get_queue_by_name(self, **kwargs): + raise ClientError({'Error': {'Code': 'AWS.SimpleQueueService.NonExistentQueue'}}, + 'sqs_test') + + def Queue(self, queue): + raise ClientError({'Error': {'Code': 'AWS.SimpleQueueService.NonExistentQueue'}}, + 'sqs_test') + + def create_queue(self, **kwargs): + return SQSSensorTestCase.MockQueue(self.msgs) + class MockResourceRaiseClientError(object): def __init__(self, error_code=''): self.error_code = error_code @@ -27,14 +46,45 @@ def __init__(self, error_code=''): def get_queue_by_name(self, **kwargs): raise ClientError({'Error': {'Code': self.error_code}}, 'sqs_test') + def Queue(self, queue): + raise ClientError({'Error': {'Code': self.error_code}}, 'sqs_test') + class MockResourceRaiseNoCredentialsError(object): def get_queue_by_name(self, **kwargs): raise NoCredentialsError() + def Queue(self, queue): + raise NoCredentialsError() + class MockResourceRaiseEndpointConnectionError(object): def get_queue_by_name(self, **kwargs): raise EndpointConnectionError(endpoint_url='') + def Queue(self, queue): + raise EndpointConnectionError(endpoint_url='') + + class MockStsClient(object): + def __init__(self): + self.meta = mock.Mock(service_model={}) + + def get_caller_identity(self): + ci = mock.Mock() + ci.get = lambda attribute: '111222333444' if attribute == 'Account' else None + return ci + + def assume_role(self, RoleArn, RoleSessionName): + return { + 'Credentials': { + 'AccessKeyId': 'access_key_id_example', + 'SecretAccessKey': 'secret_access_key_example', + 'SessionToken': 'session_token_example' + } + } + + class MockStsClientRaiseClientError(MockStsClient): + def assume_role(self, RoleArn, RoleSessionName): + raise ClientError({'Error': {'Code': 'AccessDenied'}}, 'sqs_test') + class MockQueue(object): def __init__(self, msgs=[]): self.dummy_messages = [SQSSensorTestCase.MockMessage(x) for x in msgs] @@ -54,10 +104,13 @@ def setUp(self): self.full_config = self.load_yaml('full.yaml') self.blank_config = self.load_yaml('blank.yaml') + self.multiaccount_config = self.load_yaml('multiaccount.yaml') + self.mixed_config = self.load_yaml('mixed.yaml') def load_yaml(self, filename): return yaml.safe_load(self.get_fixture_content(filename)) + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) def test_poll_with_blank_config(self): sensor = self.get_sensor_instance(config=self.blank_config) @@ -66,18 +119,29 @@ def test_poll_with_blank_config(self): self.assertEqual(self.get_dispatched_triggers(), []) + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResource())) - def test_poll_without_message(self): - sensor = self.get_sensor_instance(config=self.full_config) + def _poll_without_message(self, config): + sensor = self.get_sensor_instance(config=config) sensor.setup() sensor.poll() self.assertEqual(self.get_dispatched_triggers(), []) + def test_poll_without_message_full_config(self): + self._poll_without_message(self.full_config) + + def test_poll_without_message_multiaccount_config(self): + self._poll_without_message(self.multiaccount_config) + + def test_poll_without_message_mixed_config(self): + self._poll_without_message(self.mixed_config) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) - def test_poll_with_message(self): - sensor = self.get_sensor_instance(config=self.full_config) + def _poll_with_message(self, config): + sensor = self.get_sensor_instance(config=config) sensor.setup() sensor.poll() @@ -85,9 +149,39 @@ def test_poll_with_message(self): self.assertTriggerDispatched(trigger='aws.sqs_new_message') self.assertNotEqual(self.get_dispatched_triggers(), []) - @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) + def test_poll_with_message_full_config(self): + self._poll_with_message(self.full_config) + + def test_poll_with_message_multiaccount_config(self): + self._poll_with_message(self.multiaccount_config) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) + @mock.patch.object(Session, 'resource', + mock.Mock(return_value=MockResourceNonExistentQueue(['{"foo":"bar"}']))) + def _poll_with_non_existent_queue(self, config): + sensor = self.get_sensor_instance(config=config) + + sensor.setup() + sensor.poll() + + contexts = self.get_dispatched_triggers() + self.assertNotEqual(contexts, []) + self.assertTriggerDispatched(trigger='aws.sqs_new_message') + + def test_poll_with_non_existent_queue_full_config(self): + self._poll_with_non_existent_queue(self.full_config) + + def test_poll_with_non_existent_queue_multiaccount_config(self): + self._poll_with_non_existent_queue(self.multiaccount_config) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) + @mock.patch.object(Session, 'resource', + mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) def test_set_input_queues_config_dynamically(self): sensor = self.get_sensor_instance(config=self.blank_config) + sensor._sensor_service.set_value('aws.roles_arns', + ['arn:aws:iam::123456789098:role/rolename1'], + local=False) sensor.setup() # set credential mock to prevent sending request to AWS @@ -104,18 +198,35 @@ def test_set_input_queues_config_dynamically(self): sensor._sensor_service.set_value('aws.input_queues', 'fuga,puyo', local=False) sensor.poll() + # update input_queues to check this is reflected + sensor._sensor_service.set_value( + 'aws.input_queues', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3', + local=False + ) + sensor.poll() + contexts = self.get_dispatched_triggers() self.assertNotEqual(contexts, []) self.assertTriggerDispatched(trigger='aws.sqs_new_message') # get message from queue 'hoge', 'fuga' then 'puyo' - self.assertEqual([x['payload']['queue'] for x in contexts], ['hoge', 'fuga', 'puyo']) + self.assertEqual([x['payload']['queue'] for x in contexts], + ['hoge', 'fuga', 'puyo', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']) - @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) + @mock.patch.object(Session, 'resource', + mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) def test_set_input_queues_config_with_list(self): # set 'input_queues' config with list type config = self.full_config - config['sqs_sensor']['input_queues'] = ['foo', 'bar'] + config['sqs_sensor']['input_queues'] = [ + 'foo', + 'bar', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3' + ] + config['sqs_sensor']['roles_arns'] = ['arn:aws:iam::123456789098:role/rolename1'] sensor = self.get_sensor_instance(config=config) sensor.setup() @@ -124,34 +235,118 @@ def test_set_input_queues_config_with_list(self): contexts = self.get_dispatched_triggers() self.assertNotEqual(contexts, []) self.assertTriggerDispatched(trigger='aws.sqs_new_message') - self.assertEqual([x['payload']['queue'] for x in contexts], ['foo', 'bar']) + self.assertEqual([x['payload']['queue'] for x in contexts], + ['foo', 'bar', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']) + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', - mock.Mock(return_value=MockResourceRaiseClientError('InvalidClientTokenId'))) - def test_fails_with_invalid_token(self): - sensor = self.get_sensor_instance(config=self.full_config) + mock.Mock( + return_value=MockResourceRaiseClientError('InvalidClientTokenId')) + ) + def _fails_with_invalid_token(self, config): + sensor = self.get_sensor_instance(config=config) sensor.setup() sensor.poll() self.assertEqual(self.get_dispatched_triggers(), []) + def test_fails_with_invalid_token_full_config(self): + self._fails_with_invalid_token(self.full_config) + + def test_fails_with_invalid_token_multiaccount_config(self): + self._fails_with_invalid_token(self.multiaccount_config) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResourceRaiseNoCredentialsError())) - def test_fails_without_credentials(self): - sensor = self.get_sensor_instance(config=self.full_config) + def _fails_without_credentials(self, config): + sensor = self.get_sensor_instance(config=config) sensor.setup() sensor.poll() self.assertEqual(self.get_dispatched_triggers(), []) + def test_fails_without_credentials_full_config(self): + self._fails_without_credentials(self.full_config) + + def test_fails_without_credentials_multiaccount_config(self): + self._fails_without_credentials(self.multiaccount_config) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResourceRaiseEndpointConnectionError())) - def test_fails_with_invalid_region(self): - sensor = self.get_sensor_instance(config=self.full_config) + def _fails_with_invalid_region(self, config): + sensor = self.get_sensor_instance(config=config) sensor.setup() sensor.poll() self.assertEqual(self.get_dispatched_triggers(), []) + + def test_fails_with_invalid_region_full_config(self): + self._fails_with_invalid_region(self.full_config) + + def test_fails_with_invalid_region_multiaccount_config(self): + self._fails_with_invalid_region(self.multiaccount_config) + + @mock.patch.object(Session, 'client', + mock.Mock(return_value=MockStsClientRaiseClientError())) + @mock.patch.object(Session, 'resource', + mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) + def _fails_assuming_role(self, config): + sensor = self.get_sensor_instance(config=config) + + sensor.setup() + sensor.poll() + + def test_fails_assuming_role_full_config(self): + self._fails_assuming_role(self.full_config) + + self.assertTriggerDispatched(trigger='aws.sqs_new_message') + self.assertNotEqual(self.get_dispatched_triggers(), []) + + def test_fails_assuming_role_multiaccount_config(self): + self._fails_assuming_role(self.multiaccount_config) + self.assertEqual(self.get_dispatched_triggers(), []) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) + @mock.patch.object(Session, 'resource', + mock.Mock(side_effect=NoRegionError( + service_name='sqs', region_name='us-east-1'))) + def test_fails_creating_sqs_resource(self): + sensor = self.get_sensor_instance(config=self.mixed_config) + + sensor.setup() + sensor.poll() + + self.assertEqual(self.get_dispatched_triggers(), []) + + @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) + @mock.patch.object(Session, 'resource', + mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) + def _poll_with_missing_arn(self, config): + config['sqs_sensor']['roles_arns'] = [] + + sensor = self.get_sensor_instance(config=config) + sensor.setup() + sensor.poll() + + def test_poll_with_missing_arn_full_config(self): + self._poll_with_missing_arn(self.full_config) + + self.assertNotEqual(self.get_dispatched_triggers(), []) + self.assertTriggerDispatched(trigger='aws.sqs_new_message') + + def test_poll_with_missing_arn_multiaccount_config(self): + self._poll_with_missing_arn(self.multiaccount_config) + + self.assertEqual(self.get_dispatched_triggers(), []) + + def test_poll_with_missing_arn_mixed_config(self): + self._poll_with_missing_arn(self.mixed_config) + + self.assertNotEqual(self.get_dispatched_triggers(), []) + self.assertTriggerDispatched(trigger='aws.sqs_new_message') From 64aec5502545dc5ce22cdc1db963ec6cd67bfed2 Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Thu, 19 Dec 2019 12:13:57 +0200 Subject: [PATCH 2/8] Changed queues management --- sensors/sqs_sensor.py | 51 ++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index ed2ce997..8d16c4ff 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -38,13 +38,14 @@ import six import json +import sys -from collections import defaultdict from boto3.session import Session from botocore.exceptions import ClientError from botocore.exceptions import NoRegionError from botocore.exceptions import NoCredentialsError from botocore.exceptions import EndpointConnectionError +from collections import defaultdict from st2reactor.sensor.base import PollingSensor @@ -122,9 +123,10 @@ def _may_setup_sqs(self): # XXX: This is a hack as from datastore we can only receive a string while # from config.yaml we can receive a list if isinstance(queues, six.string_types): - self.input_queues = [x.strip() for x in queues.split(',')] + self.input_queues = [six.moves.urllib.parse.urlparse(x.strip()) for x in + queues.split(',')] elif isinstance(queues, list): - self.input_queues = queues + self.input_queues = [six.moves.urllib.parse.urlparse(queue) for queue in queues] else: self.input_queues = [] @@ -204,14 +206,35 @@ def _setup_sqs(self, session, account_id, region): self._logger.error("The specified region '%s' for account %s is invalid.", region, account_id) - def _check_queue_if_url(self, queue): - return queue.startswith('http://') or queue.startswith('https://') - def _get_info(self, queue): ''' Retrieve the account ID and region from the queue URL ''' - if self._check_queue_if_url(queue): - return queue.split('/')[3], queue.split('.')[1] - return self.account_id, self.aws_region + # Pull default values from previous configuration + account_id = self.account_id + aws_region = self.aws_region + + # Netloc will be empty if the queue is just a name + if queue.netloc: + try: + account_id = queue.path.split('/')[1] + except IndexError as e: + six.reraise(type(e), type(e)( + "Queue URL must contain the account ID as the first part of the path, " + "eg: https://.../"), + sys.exec_info()[2]) + else: + self._logger.debug("Using %s as account_id", account_id) + + try: + aws_region = queue.netloc.split('.')[1] + except IndexError: + six.reraise(type(e), type(e)( + "Queue URL must contain the AWS region, " + "eg: https://sqs..amazonaws.com/..."), + sys.exec_info()[2]) + else: + self._logger.debug("Using %s as the AWS region", aws_region) + + return account_id, aws_region def _get_queue(self, queue, account_id, region): ''' Fetch QUEUE by its name or URL and create new one if queue doesn't exist ''' @@ -224,15 +247,13 @@ def _get_queue(self, queue, account_id, region): return None try: - if self._check_queue_if_url(queue): - return sqs_res.Queue(queue) - return sqs_res.get_queue_by_name(QueueName=queue) + if queue.netloc: + return sqs_res.Queue(six.moves.urllib.parse.urlunparse(queue)) + return sqs_res.get_queue_by_name(QueueName=queue.path.split('/')[-1]) except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queue) - if self._check_queue_if_url(queue): - return sqs_res.create_queue(QueueName=queue.split('/')[4]) - return sqs_res.create_queue(QueueName=queue) + return sqs_res.create_queue(QueueName=queue.path.split('/')[-1]) elif e.response['Error']['Code'] == 'InvalidClientTokenId': self._logger.warning("Couldn't operate sqs because of invalid credential config") else: From 4b7245b5e81ac6f0759990e019caeefa007c6f70 Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Fri, 20 Dec 2019 11:54:16 +0200 Subject: [PATCH 3/8] Syntax changes --- aws.yaml.example | 2 +- config.schema.yaml | 2 +- sensors/sqs_sensor.py | 20 ++++++++++---------- tests/fixtures/blank.yaml | 2 +- tests/fixtures/mixed.yaml | 2 +- tests/fixtures/multiaccount.yaml | 2 +- tests/test_sensor_sqs.py | 17 ++++++++++------- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/aws.yaml.example b/aws.yaml.example index 0b0a43e4..406b45cf 100755 --- a/aws.yaml.example +++ b/aws.yaml.example @@ -11,7 +11,7 @@ service_notifications_sensor: path: /my-path sqs_sensor: - roles_arns: + roles: - arn:aws:iam::123456789098:role/rolename1 - arn:aws:iam::901234567812:role/rolename2 - arn:aws:iam::567890123489:role/rolename3 diff --git a/config.schema.yaml b/config.schema.yaml index 51c3dda9..e6ed811e 100755 --- a/config.schema.yaml +++ b/config.schema.yaml @@ -45,7 +45,7 @@ type: "array" items: type: "string" - roles_arns: + roles: type: "array" description: "ARNs of the roles which allow queues to be fetched for messages" items: diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index 8d16c4ff..21f3561d 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -141,15 +141,15 @@ def _is_same_credentials(session, account_id): c.secret_key == self.credentials[account_id][1] and \ (account_id == self.account_id or c.token == self.credentials[account_id][2]) - # build a map between 'account_id' and its 'role arn' by parsing the matching config entry - cross_roles_arns = { + # Build a map between 'account_id' and its 'role arn' by parsing the matching config entry + cross_roles = { arn.split(':')[4]: arn - for arn in self._get_config_entry('roles_arns', 'sqs_sensor') or [] + for arn in self._get_config_entry('roles', 'sqs_sensor') or [] } required_accounts = {self._get_info(queue)[0] for queue in self.input_queues} for account_id in required_accounts: - if account_id != self.account_id and account_id not in cross_roles_arns: + if account_id != self.account_id and account_id not in cross_roles: continue session = self.sessions.get(account_id) @@ -157,7 +157,7 @@ def _is_same_credentials(session, account_id): if account_id == self.account_id: self._setup_session() else: - self._setup_multiaccount_session(account_id, cross_roles_arns) + self._setup_multiaccount_session(account_id, cross_roles) def _setup_session(self): ''' Setup Boto3 session ''' @@ -171,11 +171,11 @@ def _setup_session(self): self.sessions[self.account_id] = session self.sqs_res.pop(self.account_id, None) - def _setup_multiaccount_session(self, account_id, cross_roles_arns): + def _setup_multiaccount_session(self, account_id, cross_roles): ''' Assume role and setup session for the cross-account capability''' try: assumed_role = self.sessions[self.account_id].client('sts').assume_role( - RoleArn=cross_roles_arns[account_id], + RoleArn=cross_roles[account_id], RoleSessionName='StackStormEvents' ) except ClientError: @@ -219,8 +219,8 @@ def _get_info(self, queue): except IndexError as e: six.reraise(type(e), type(e)( "Queue URL must contain the account ID as the first part of the path, " - "eg: https://.../"), - sys.exec_info()[2]) + "eg: https://sqs..amazonaws.com//"), + sys.exc_info()[2]) else: self._logger.debug("Using %s as account_id", account_id) @@ -230,7 +230,7 @@ def _get_info(self, queue): six.reraise(type(e), type(e)( "Queue URL must contain the AWS region, " "eg: https://sqs..amazonaws.com/..."), - sys.exec_info()[2]) + sys.exc_info()[2]) else: self._logger.debug("Using %s as the AWS region", aws_region) diff --git a/tests/fixtures/blank.yaml b/tests/fixtures/blank.yaml index 73b314ff..ed97d539 100644 --- a/tests/fixtures/blank.yaml +++ b/tests/fixtures/blank.yaml @@ -1 +1 @@ ---- \ No newline at end of file +--- diff --git a/tests/fixtures/mixed.yaml b/tests/fixtures/mixed.yaml index 7a22a5de..38910aa0 100644 --- a/tests/fixtures/mixed.yaml +++ b/tests/fixtures/mixed.yaml @@ -10,7 +10,7 @@ service_notifications_sensor: path: "/my-path" sqs_sensor: - roles_arns: + roles: - "arn:aws:iam::345678901223:role/rolename1" input_queues: - "input_queue" diff --git a/tests/fixtures/multiaccount.yaml b/tests/fixtures/multiaccount.yaml index 1d76b78f..08b0de25 100644 --- a/tests/fixtures/multiaccount.yaml +++ b/tests/fixtures/multiaccount.yaml @@ -10,7 +10,7 @@ service_notifications_sensor: path: "/my-path" sqs_sensor: - roles_arns: + roles: - "arn:aws:iam::345678901223:role/rolename1" input_queues: - "https://sqs.us-east-1.amazonaws.com/345678901223/queue_name_2" diff --git a/tests/test_sensor_sqs.py b/tests/test_sensor_sqs.py index 7692fb11..d997a4f2 100644 --- a/tests/test_sensor_sqs.py +++ b/tests/test_sensor_sqs.py @@ -1,4 +1,5 @@ import mock +import six import yaml from boto3.session import Session @@ -179,7 +180,7 @@ def test_poll_with_non_existent_queue_multiaccount_config(self): mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) def test_set_input_queues_config_dynamically(self): sensor = self.get_sensor_instance(config=self.blank_config) - sensor._sensor_service.set_value('aws.roles_arns', + sensor._sensor_service.set_value('aws.roles', ['arn:aws:iam::123456789098:role/rolename1'], local=False) sensor.setup() @@ -212,8 +213,9 @@ def test_set_input_queues_config_dynamically(self): # get message from queue 'hoge', 'fuga' then 'puyo' self.assertEqual([x['payload']['queue'] for x in contexts], - ['hoge', 'fuga', 'puyo', - 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']) + [six.moves.urllib.parse.urlparse(queue) for queue in + ['hoge', 'fuga', 'puyo', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']]) @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', @@ -226,7 +228,7 @@ def test_set_input_queues_config_with_list(self): 'bar', 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3' ] - config['sqs_sensor']['roles_arns'] = ['arn:aws:iam::123456789098:role/rolename1'] + config['sqs_sensor']['roles'] = ['arn:aws:iam::123456789098:role/rolename1'] sensor = self.get_sensor_instance(config=config) sensor.setup() @@ -236,8 +238,9 @@ def test_set_input_queues_config_with_list(self): self.assertNotEqual(contexts, []) self.assertTriggerDispatched(trigger='aws.sqs_new_message') self.assertEqual([x['payload']['queue'] for x in contexts], - ['foo', 'bar', - 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']) + [six.moves.urllib.parse.urlparse(queue) for queue in + ['foo', 'bar', + 'https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3']]) @mock.patch.object(Session, 'client', mock.Mock(return_value=MockStsClient())) @mock.patch.object(Session, 'resource', @@ -328,7 +331,7 @@ def test_fails_creating_sqs_resource(self): @mock.patch.object(Session, 'resource', mock.Mock(return_value=MockResource(['{"foo":"bar"}']))) def _poll_with_missing_arn(self, config): - config['sqs_sensor']['roles_arns'] = [] + config['sqs_sensor']['roles'] = [] sensor = self.get_sensor_instance(config=config) sensor.setup() From ae4a6f0dabca4751ebca415a8200aeda9fd093fd Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Thu, 9 Apr 2020 19:46:57 +0300 Subject: [PATCH 4/8] Retry if session token expires when polling from SQS instance --- sensors/sqs_sensor.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index 21f3561d..c01923f5 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -61,6 +61,7 @@ def setup(self): self.account_id = None self.credentials = {} self.sessions = {} + self.cross_roles = {} self.sqs_res = defaultdict(dict) def poll(self): @@ -69,8 +70,18 @@ def poll(self): for queue in self.input_queues: account_id, region = self._get_info(queue) - msgs = self._receive_messages(queue=self._get_queue(queue, account_id, region), - num_messages=self.max_number_of_messages) + + while True: + try: + msgs = self._receive_messages(queue=self._get_queue(queue, account_id, region), + num_messages=self.max_number_of_messages) + except ClientError as e: + if e.response['Error']['Code'] == 'ExpiredToken': + self._setup_multiaccount_session(account_id) + continue + raise + break + for msg in msgs: if msg: payload = {"queue": queue, @@ -142,14 +153,14 @@ def _is_same_credentials(session, account_id): (account_id == self.account_id or c.token == self.credentials[account_id][2]) # Build a map between 'account_id' and its 'role arn' by parsing the matching config entry - cross_roles = { + self.cross_roles = { arn.split(':')[4]: arn for arn in self._get_config_entry('roles', 'sqs_sensor') or [] } required_accounts = {self._get_info(queue)[0] for queue in self.input_queues} for account_id in required_accounts: - if account_id != self.account_id and account_id not in cross_roles: + if account_id != self.account_id and account_id not in self.cross_roles: continue session = self.sessions.get(account_id) @@ -157,7 +168,7 @@ def _is_same_credentials(session, account_id): if account_id == self.account_id: self._setup_session() else: - self._setup_multiaccount_session(account_id, cross_roles) + self._setup_multiaccount_session(account_id, self.cross_roles) def _setup_session(self): ''' Setup Boto3 session ''' @@ -171,11 +182,11 @@ def _setup_session(self): self.sessions[self.account_id] = session self.sqs_res.pop(self.account_id, None) - def _setup_multiaccount_session(self, account_id, cross_roles): + def _setup_multiaccount_session(self, account_id): ''' Assume role and setup session for the cross-account capability''' try: assumed_role = self.sessions[self.account_id].client('sts').assume_role( - RoleArn=cross_roles[account_id], + RoleArn=self.cross_roles[account_id], RoleSessionName='StackStormEvents' ) except ClientError: From 92ca375fc202664a9a5db9545e0dbf8173e6225e Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Thu, 9 Apr 2020 20:09:55 +0300 Subject: [PATCH 5/8] minor change --- sensors/sqs_sensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index c01923f5..74659629 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -237,7 +237,7 @@ def _get_info(self, queue): try: aws_region = queue.netloc.split('.')[1] - except IndexError: + except IndexError as e: six.reraise(type(e), type(e)( "Queue URL must contain the AWS region, " "eg: https://sqs..amazonaws.com/..."), From 1ec7ab710652ac479726eb2c049f1cb3fc650498 Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Thu, 9 Apr 2020 20:54:58 +0300 Subject: [PATCH 6/8] minor change --- sensors/sqs_sensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sensors/sqs_sensor.py b/sensors/sqs_sensor.py index 74659629..3076626b 100755 --- a/sensors/sqs_sensor.py +++ b/sensors/sqs_sensor.py @@ -168,7 +168,7 @@ def _is_same_credentials(session, account_id): if account_id == self.account_id: self._setup_session() else: - self._setup_multiaccount_session(account_id, self.cross_roles) + self._setup_multiaccount_session(account_id) def _setup_session(self): ''' Setup Boto3 session ''' From 95e30956365f0382abaea08887c486d1ff1006e6 Mon Sep 17 00:00:00 2001 From: Stefan Gusa Date: Thu, 9 Apr 2020 21:09:28 +0300 Subject: [PATCH 7/8] Updated CHANDES.md and pack.yaml --- CHANGES.md | 3 +++ pack.yaml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index db93d5f2..bb66d751 100755 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ # Changelog +## 1.2.4 +- `sqs_sensor` supports multiaccount integration. + ## 1.2.3 - Support Python 3 everywhere diff --git a/pack.yaml b/pack.yaml index aa0b4da7..329e6a22 100755 --- a/pack.yaml +++ b/pack.yaml @@ -19,7 +19,7 @@ keywords: - SQS - lambda - kinesis -version : 1.2.3 +version : 1.3.0 author : StackStorm, Inc. email : info@stackstorm.com python_versions: From c02f53d0c21dd44406b336c6ba353e9b7f24330e Mon Sep 17 00:00:00 2001 From: Nick Maludy Date: Thu, 9 Apr 2020 14:12:42 -0400 Subject: [PATCH 8/8] Update CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index bb66d751..b868e57d 100755 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,6 @@ # Changelog -## 1.2.4 +## 1.3.0 - `sqs_sensor` supports multiaccount integration. ## 1.2.3