Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.3.0
- `sqs_sensor` supports multiaccount integration.

## 1.2.3
- Support Python 3 everywhere

Expand Down
11 changes: 10 additions & 1 deletion aws.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ service_notifications_sensor:
path: /my-path

sqs_sensor:
roles:
- arn:aws:iam::123456789098:role/rolename1
- arn:aws:iam::901234567812:role/rolename2
- arn:aws:iam::567890123489:role/rolename3
input_queues:
- queuename
- queue_name_1
- https://sqs.us-east-1.amazonaws.com/567890123489/queue_name_2
- https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_3
- https://sqs.us-west-2.amazonaws.com/123456789098/queue_name_4
- https://sqs.eu-west-1.amazonaws.com/901234567812/queue_name_5
- https://sqs.sa-east-1.amazonaws.com/567890123489/queue_name_6

sqs_other:
max_number_of_messages: 1
7 changes: 6 additions & 1 deletion config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@
type: object
properties:
input_queues:
description: "Names of queue to fetch messages from Amazon SQS"
description: "Names or URLs of queues to fetch messages from Amazon SQS"
type: "array"
items:
type: "string"
roles:
type: "array"
description: "ARNs of the roles which allow queues to be fetched for messages"
items:
type: "string"
sqs_other:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion pack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ keywords:
- SQS
- lambda
- kinesis
version : 1.2.3
version : 1.3.0
author : StackStorm, Inc.
email : info@stackstorm.com
python_versions:
Expand Down
191 changes: 153 additions & 38 deletions sensors/sqs_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@

import six
import json
import sys

from boto3.session import Session
from botocore.exceptions import ClientError
from botocore.exceptions import NoRegionError
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import EndpointConnectionError
from collections import defaultdict

from st2reactor.sensor.base import PollingSensor

Expand All @@ -55,19 +58,36 @@ def __init__(self, sensor_service, config=None, poll_interval=5):
def setup(self):
self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)

self.session = None
self.sqs_res = None
self.account_id = None
self.credentials = {}
self.sessions = {}
self.cross_roles = {}
self.sqs_res = defaultdict(dict)

def poll(self):
# setting SQS ServiceResource object from the parameter of datastore or configuration file
self._may_setup_sqs()

for queue in self.input_queues:
msgs = self._receive_messages(queue=self._get_queue_by_name(queue),
num_messages=self.max_number_of_messages)
account_id, region = self._get_info(queue)

while True:
try:
msgs = self._receive_messages(queue=self._get_queue(queue, account_id, region),
num_messages=self.max_number_of_messages)
except ClientError as e:
if e.response['Error']['Code'] == 'ExpiredToken':
self._setup_multiaccount_session(account_id)
continue
raise
break

for msg in msgs:
if msg:
payload = {"queue": queue, "body": json.loads(msg.body)}
payload = {"queue": queue,
"account_id": account_id,
"region": region,
"body": json.loads(msg.body)}
self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
msg.delete()

Expand All @@ -89,7 +109,7 @@ def _get_config_entry(self, key, prefix=None):
''' Get configuration values either from Datastore or config file. '''
config = self.config
if prefix:
config = self._config.get(prefix, {})
config = self.config.get(prefix, {})

value = self._sensor_service.get_value('aws.%s' % (key), local=False)
if not value:
Expand All @@ -101,61 +121,156 @@ def _get_config_entry(self, key, prefix=None):
return value

def _may_setup_sqs(self):
queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')
self.access_key_id = self._get_config_entry('aws_access_key_id')
self.secret_access_key = self._get_config_entry('aws_secret_access_key')
self.aws_region = self._get_config_entry('region')
self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
prefix='sqs_other')

if not self.account_id:
self._setup_session()

queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')
# XXX: This is a hack as from datastore we can only receive a string while
# from config.yaml we can receive a list
if isinstance(queues, six.string_types):
self.input_queues = [x.strip() for x in queues.split(',')]
self.input_queues = [six.moves.urllib.parse.urlparse(x.strip()) for x in
queues.split(',')]
elif isinstance(queues, list):
self.input_queues = queues
self.input_queues = [six.moves.urllib.parse.urlparse(queue) for queue in queues]
else:
self.input_queues = []

self.aws_access_key = self._get_config_entry('aws_access_key_id')
self.aws_secret_key = self._get_config_entry('aws_secret_access_key')
self.aws_region = self._get_config_entry('region')

self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
prefix='sqs_other')

# checker configuration is update, or not
def _is_same_credentials():
c = self.session.get_credentials()
def _is_same_credentials(session, account_id):
if not session:
return False

c = session.get_credentials()
return c is not None and \
c.access_key == self.aws_access_key and \
c.secret_key == self.aws_secret_key and \
self.session.region_name == self.aws_region
c.access_key == self.credentials[account_id][0] and \
c.secret_key == self.credentials[account_id][1] and \
(account_id == self.account_id or c.token == self.credentials[account_id][2])

# Build a map between 'account_id' and its 'role arn' by parsing the matching config entry
self.cross_roles = {
arn.split(':')[4]: arn
for arn in self._get_config_entry('roles', 'sqs_sensor') or []
}
required_accounts = {self._get_info(queue)[0] for queue in self.input_queues}

if self.session is None or not _is_same_credentials():
self._setup_sqs()
for account_id in required_accounts:
if account_id != self.account_id and account_id not in self.cross_roles:
continue

def _setup_sqs(self):
''' Setup Boto3 structures '''
self._logger.debug('Setting up SQS resources')
self.session = Session(aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_key,
region_name=self.aws_region)
session = self.sessions.get(account_id)
if not _is_same_credentials(session, account_id):
if account_id == self.account_id:
self._setup_session()
else:
self._setup_multiaccount_session(account_id)

def _setup_session(self):
''' Setup Boto3 session '''
session = Session(aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key)

if not self.account_id:
self.account_id = session.client('sts').get_caller_identity().get('Account')
self.credentials[self.account_id] = (self.access_key_id, self.secret_access_key, None)

self.sessions[self.account_id] = session
self.sqs_res.pop(self.account_id, None)

def _setup_multiaccount_session(self, account_id):
''' Assume role and setup session for the cross-account capability'''
try:
assumed_role = self.sessions[self.account_id].client('sts').assume_role(
RoleArn=self.cross_roles[account_id],
RoleSessionName='StackStormEvents'
)
except ClientError:
self._logger.error('Could not assume role on %s', account_id)
return

self.credentials[account_id] = (assumed_role["Credentials"]["AccessKeyId"],
assumed_role["Credentials"]["SecretAccessKey"],
assumed_role["Credentials"]["SessionToken"])

session = Session(
aws_access_key_id=self.credentials[account_id][0],
aws_secret_access_key=self.credentials[account_id][1],
aws_session_token=self.credentials[account_id][2]
)
self.sessions[account_id] = session
self.sqs_res.pop(account_id, None)

def _setup_sqs(self, session, account_id, region):
''' Setup SQS resources'''
if region in self.sqs_res[account_id]:
return self.sqs_res[account_id][region]

try:
self.sqs_res = self.session.resource('sqs')
self.sqs_res[account_id][region] = session.resource('sqs', region_name=region)
return self.sqs_res[account_id][region]
except NoRegionError:
self._logger.warning("The specified region '%s' is invalid", self.aws_region)
self._logger.error("The specified region '%s' for account %s is invalid.",
region, account_id)

def _get_info(self, queue):
''' Retrieve the account ID and region from the queue URL '''
# Pull default values from previous configuration
account_id = self.account_id
aws_region = self.aws_region

# Netloc will be empty if the queue is just a name
if queue.netloc:
try:
account_id = queue.path.split('/')[1]
except IndexError as e:
six.reraise(type(e), type(e)(
"Queue URL must contain the account ID as the first part of the path, "
"eg: https://sqs.<aws_region>.amazonaws.com/<account_id>/<queue_name>"),
sys.exc_info()[2])
else:
self._logger.debug("Using %s as account_id", account_id)

try:
aws_region = queue.netloc.split('.')[1]
except IndexError as e:
six.reraise(type(e), type(e)(
"Queue URL must contain the AWS region, "
"eg: https://sqs.<aws_region>.amazonaws.com/..."),
sys.exc_info()[2])
else:
self._logger.debug("Using %s as the AWS region", aws_region)

return account_id, aws_region

def _get_queue(self, queue, account_id, region):
''' Fetch QUEUE by its name or URL and create new one if queue doesn't exist '''
if account_id not in self.sessions:
self._logger.error('Session for account id %s does not exist', account_id)
return None

sqs_res = self._setup_sqs(self.sessions[account_id], account_id, region)
if sqs_res is None:
return None

def _get_queue_by_name(self, queueName):
''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
try:
return self.sqs_res.get_queue_by_name(QueueName=queueName)
if queue.netloc:
return sqs_res.Queue(six.moves.urllib.parse.urlunparse(queue))
return sqs_res.get_queue_by_name(QueueName=queue.path.split('/')[-1])
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
return self.sqs_res.create_queue(QueueName=queueName)
self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queue)
return sqs_res.create_queue(QueueName=queue.path.split('/')[-1])
elif e.response['Error']['Code'] == 'InvalidClientTokenId':
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
self._logger.warning("Couldn't operate sqs because of invalid credential config")
else:
raise
except NoCredentialsError:
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
self._logger.warning("Couldn't operate sqs because of invalid credential config")
except EndpointConnectionError as e:
self._logger.warning(e)

Expand Down
4 changes: 4 additions & 0 deletions sensors/sqs_sensor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
properties:
queue:
type: "string"
account_id:
type: "string"
region:
type: "string"
body:
type: "object"
3 changes: 2 additions & 1 deletion tests/fixtures/full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ service_notifications_sensor:
path: "/my-path"

sqs_sensor:
input_queues: "input_queue"
input_queues:
- "input_queue"

sqs_other:
max_number_of_messages: 1
20 changes: 20 additions & 0 deletions tests/fixtures/mixed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
aws_access_key_id: "access_key_id"
aws_secret_access_key: "secret_key"
region: "us-west-1"
st2_user_data: ""

service_notifications_sensor:
host: "localhost"
port: 12345
path: "/my-path"

sqs_sensor:
roles:
- "arn:aws:iam::345678901223:role/rolename1"
input_queues:
- "input_queue"
- "https://sqs.us-east-1.amazonaws.com/345678901223/queue_name_2"

sqs_other:
max_number_of_messages: 1
19 changes: 19 additions & 0 deletions tests/fixtures/multiaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
aws_access_key_id: "access_key_id"
aws_secret_access_key: "secret_key"
region: "us-west-1"
st2_user_data: ""

service_notifications_sensor:
host: "localhost"
port: 12345
path: "/my-path"

sqs_sensor:
roles:
- "arn:aws:iam::345678901223:role/rolename1"
input_queues:
- "https://sqs.us-east-1.amazonaws.com/345678901223/queue_name_2"

sqs_other:
max_number_of_messages: 1
Loading