From 28a8d90587c3a13db37236a2e3dac92fbe5d4447 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Tue, 16 Dec 2014 02:27:20 +0000 Subject: [PATCH 01/12] WIP: Timer should not be sensor --- st2api/st2api/controllers/v1/webhooks.py | 4 +- st2common/st2common/constants/triggers.py | 166 +++++++++++++++- .../contrib/sensors/st2_timer_sensor.py | 188 ++++-------------- 3 files changed, 210 insertions(+), 148 deletions(-) diff --git a/st2api/st2api/controllers/v1/webhooks.py b/st2api/st2api/controllers/v1/webhooks.py index 1c1e2cf46e..ad6269c101 100644 --- a/st2api/st2api/controllers/v1/webhooks.py +++ b/st2api/st2api/controllers/v1/webhooks.py @@ -24,7 +24,7 @@ from urlparse import urljoin from st2common import log as logging -from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF +from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES from st2common.models.base import jsexpose from st2common.services.triggerwatcher import TriggerWatcher from st2common.transport.reactor import TriggerDispatcher @@ -39,7 +39,7 @@ def __init__(self, *args, **kwargs): super(WebhooksController, self).__init__(*args, **kwargs) self._hooks = {} self._base_url = '/webhooks/' - self._trigger_types = [GENERIC_WEBHOOK_TRIGGER_REF] + self._trigger_types = WEBHOOK_TRIGGER_TYPES.keys() self._trigger_dispatcher = TriggerDispatcher(LOG) self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, diff --git a/st2common/st2common/constants/triggers.py b/st2common/st2common/constants/triggers.py index 3e9437aa0c..a230afb295 100644 --- a/st2common/st2common/constants/triggers.py +++ b/st2common/st2common/constants/triggers.py @@ -13,4 +13,168 @@ # See the License for the specific language governing permissions and # limitations under the License. -GENERIC_WEBHOOK_TRIGGER_REF = 'core.st2.webhook' +from st2common.constants.pack import SYSTEM_PACK_NAME +from st2common.models.system.common import ResourceReference + + +WEBHOOKS_PARAMETERS_SCHEMA = { + 'type': 'object', + 'properties': { + 'url': { + 'type': 'string' + } + }, + 'required': [ + 'url' + ], + 'additionalProperties': False +} + + +WEBHOOKS_PAYLOAD_SCHEMA = { + 'type': 'object' +} + +WEBHOOK_TRIGGER_TYPES = { + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.webhook'): { + 'name': 'st2.webhook', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Trigger type for registering webhooks that can consume' + + ' arbitrary payload.', + 'parameters_schema': WEBHOOKS_PARAMETERS_SCHEMA, + 'payload_schema': WEBHOOKS_PAYLOAD_SCHEMA + } +} + +# Timer specs + +INTERVAL_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "unit": { + "enum": ["weeks", "days", "hours", "minutes", "seconds"] + }, + "delta": { + "type": "integer" + } + }, + "required": [ + "unit", + "delta" + ], + "additionalProperties": False +} + +DATE_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "date": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "date" + ], + "additionalProperties": False +} + +CRON_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "year": { + "type": "integer" + }, + "month": { + "type": "integer", + "minimum": 1, + "maximum": 12 + }, + "day": { + "type": "integer", + "minimum": 1, + "maximum": 31 + }, + "week": { + "type": "integer", + "minimum": 1, + "maximum": 53 + }, + "day_of_week": { + "type": "integer", + "minimum": 0, + "maximum": 6 + }, + "hour": { + "type": "integer", + "minimum": 0, + "maximum": 23 + }, + "minute": { + "type": "integer", + "minimum": 0, + "maximum": 59 + }, + "second": { + "type": "integer", + "minimum": 0, + "maximum": 59 + } + }, + "additionalProperties": False +} + +TIMER_PAYLOAD_SCHEMA = { + "type": "object", + "properties": { + "executed_at": { + "type": "string", + "format": "date-time", + "default": "2014-07-30 05:04:24.578325" + }, + "schedule": { + "type": "object", + "default": { + "delta": 30, + "units": "seconds" + } + } + } +} + +TIMER_TRIGGER_TYPES = { + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer'): { + 'name': 'st2.IntervalTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers on specified intervals. e.g. every 30s, 1week etc.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': INTERVAL_PARAMETERS_SCHEMA + }, + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.DateTimer'): { + 'name': 'st2.DateTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers exactly once when the current time matches the specified time. ' + 'e.g. timezone:UTC date:2014-12-31 23:59:59.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': DATE_PARAMETERS_SCHEMA + }, + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.CronTimer'): { + 'name': 'st2.CronTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers whenever current time matches the specified time constaints like ' + 'a UNIX cron scheduler.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': CRON_PARAMETERS_SCHEMA + } +} + +SYSTEM_TRIGGER_TYPES = dict(WEBHOOK_TRIGGER_TYPES.items() + TIMER_TRIGGER_TYPES.items()) diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py index fa63540179..94500706e0 100644 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py +++ b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py @@ -24,156 +24,31 @@ import dateutil.parser as date_parser import jsonschema -from st2common.constants.pack import SYSTEM_PACK_NAME -from st2common.models.system.common import ResourceReference -from st2reactor.sensor.base import Sensor +from st2common import log as logging +from st2common.constants.triggers import TIMER_TRIGGER_TYPES +from st2common.services.triggerwatcher import TriggerWatcher +from st2common.transport.reactor import TriggerDispatcher - -INTERVAL_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "unit": { - "enum": ["weeks", "days", "hours", "minutes", "seconds"] - }, - "delta": { - "type": "integer" - } - }, - "required": [ - "unit", - "delta" - ], - "additionalProperties": False -} - -DATE_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "date": { - "type": "string", - "format": "date-time" - } - }, - "required": [ - "date" - ], - "additionalProperties": False -} - -CRON_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "year": { - "type": "integer" - }, - "month": { - "type": "integer", - "minimum": 1, - "maximum": 12 - }, - "day": { - "type": "integer", - "minimum": 1, - "maximum": 31 - }, - "week": { - "type": "integer", - "minimum": 1, - "maximum": 53 - }, - "day_of_week": { - "type": "integer", - "minimum": 0, - "maximum": 6 - }, - "hour": { - "type": "integer", - "minimum": 0, - "maximum": 23 - }, - "minute": { - "type": "integer", - "minimum": 0, - "maximum": 59 - }, - "second": { - "type": "integer", - "minimum": 0, - "maximum": 59 - } - }, - "additionalProperties": False -} - -PAYLOAD_SCHEMA = { - "type": "object", - "properties": { - "executed_at": { - "type": "string", - "format": "date-time", - "default": "2014-07-30 05:04:24.578325" - }, - "schedule": { - "type": "object", - "default": { - "delta": 30, - "units": "seconds" - } - } - } -} - -TRIGGER_TYPES = { - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer'): { - 'name': 'st2.IntervalTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers on specified intervals. e.g. every 30s, 1week etc.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': INTERVAL_PARAMETERS_SCHEMA - }, - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.DateTimer'): { - 'name': 'st2.DateTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers exactly once when the current time matches the specified time. ' - 'e.g. timezone:UTC date:2014-12-31 23:59:59.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': DATE_PARAMETERS_SCHEMA - }, - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.CronTimer'): { - 'name': 'st2.CronTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers whenever current time matches the specified time constaints like ' - 'a UNIX cron scheduler.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': CRON_PARAMETERS_SCHEMA - } -} +LOG = logging.getLogger(__name__) -class St2TimerSensor(Sensor): +class St2Timer(object): ''' A timer sensor that uses APScheduler 3.0. ''' def __init__(self, sensor_service=None): self._timezone = 'America/Los_Angeles' # Whatever TZ local box runs in. - self._sensor_service = sensor_service - self._log = self._sensor_service.get_logger(self.__class__.__name__) self._scheduler = BlockingScheduler(timezone=self._timezone) self._jobs = {} - - def setup(self): - pass - - def run(self): + self._trigger_types = TIMER_TRIGGER_TYPES.keys() + self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, + update_handler=self._handle_update_trigger, + delete_handler=self._handle_delete_trigger, + trigger_types=self._trigger_types, + queue_suffix='timers') + self._trigger_dispatcher = TriggerDispatcher(LOG) + + def start(self): self._scheduler.start() def cleanup(self): @@ -197,12 +72,9 @@ def remove_trigger(self, trigger): self._scheduler.remove_job(job_id) - def _get_trigger_type(self, ref): - pass - def _add_job_to_scheduler(self, trigger): trigger_type_ref = trigger['type'] - trigger_type = TRIGGER_TYPES[trigger_type_ref] + trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref] try: jsonschema.validate(trigger['parameters'], trigger_type['parameters_schema']) @@ -255,4 +127,30 @@ def _emit_trigger_instance(self, trigger): 'executed_at': str(datetime.utcnow()), 'schedule': trigger['parameters'].get('time') } - self._sensor_service.dispatch(trigger, payload) + self._trigger_dispatcher.dispatch(trigger, payload) + + ############################################## + # Event handler methods for the trigger events + ############################################## + + def _handle_create_trigger(self, trigger): + LOG.debug('Calling "add_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.add_trigger(trigger=trigger) + + def _handle_update_trigger(self, trigger): + LOG.debug('Calling "update_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.update_trigger(trigger=trigger) + + def _handle_delete_trigger(self, trigger): + LOG.debug('Calling "remove_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.remove_trigger(trigger=trigger) + + def _sanitize_trigger(self, trigger): + sanitized = trigger._data + if 'id' in sanitized: + # Friendly objectid rather than the MongoEngine representation. + sanitized['id'] = str(sanitized['id']) + return sanitized From 5ce0b93614a6f88811f692883fc183248823c63b Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Tue, 16 Dec 2014 02:32:03 +0000 Subject: [PATCH 02/12] Fix log statements --- .../contrib/sensors/st2_timer_sensor.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py index 94500706e0..fc3b88d785 100644 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py +++ b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py @@ -33,9 +33,9 @@ class St2Timer(object): - ''' - A timer sensor that uses APScheduler 3.0. - ''' + """ + A timer interface that uses APScheduler 3.0. + """ def __init__(self, sensor_service=None): self._timezone = 'America/Los_Angeles' # Whatever TZ local box runs in. self._scheduler = BlockingScheduler(timezone=self._timezone) @@ -67,7 +67,7 @@ def remove_trigger(self, trigger): try: job_id = self._jobs[id] except KeyError: - self._log.info('Job not found: %s', id) + LOG.info('Job not found: %s', id) return self._scheduler.remove_job(job_id) @@ -79,8 +79,8 @@ def _add_job_to_scheduler(self, trigger): jsonschema.validate(trigger['parameters'], trigger_type['parameters_schema']) except jsonschema.ValidationError as e: - self._log.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) raise # Or should we just return? time_spec = trigger['parameters'] @@ -103,8 +103,8 @@ def _add_job_to_scheduler(self, trigger): time_type = CronTrigger(**cron) if hasattr(time_type, 'run_date') and datetime.now(tzutc()) > time_type.run_date: - self._log.warning('Not scheduling expired timer: %s : %s', - trigger['parameters'], time_type.run_date) + LOG.warning('Not scheduling expired timer: %s : %s', + trigger['parameters'], time_type.run_date) else: self._add_job(trigger, time_type) @@ -114,14 +114,14 @@ def _add_job(self, trigger, time_type, replace=True): trigger=time_type, args=[trigger], replace_existing=replace) - self._log.info('Job %s scheduled.', job.id) + LOG.info('Job %s scheduled.', job.id) self._jobs[trigger['id']] = job.id except Exception as e: - self._log.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) def _emit_trigger_instance(self, trigger): - self._log.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) + LOG.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) payload = { 'executed_at': str(datetime.utcnow()), From d9e14b9d7fa41127bedbd430dabf417d4866df06 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:14:27 +0000 Subject: [PATCH 03/12] Fix: Refactor container utils to use services/triggers --- st2common/st2common/services/triggers.py | 45 ++++++++++-- st2reactor/st2reactor/container/utils.py | 87 +++++------------------- st2reactor/tests/test_container_utils.py | 36 ++++------ 3 files changed, 69 insertions(+), 99 deletions(-) diff --git a/st2common/st2common/services/triggers.py b/st2common/st2common/services/triggers.py index a37afb591b..7813b45e6d 100644 --- a/st2common/st2common/services/triggers.py +++ b/st2common/st2common/services/triggers.py @@ -14,9 +14,9 @@ # limitations under the License. from st2common import log as logging -from st2common.models.api.reactor import TriggerAPI +from st2common.models.api.reactor import (TriggerAPI, TriggerTypeAPI) from st2common.models.system.common import ResourceReference -from st2common.persistence.reactor import Trigger +from st2common.persistence.reactor import (Trigger, TriggerType) LOG = logging.getLogger(__name__) @@ -44,9 +44,7 @@ def get_trigger_db(trigger): # TODO: This method should die in a fire if isinstance(trigger, str) or isinstance(trigger, unicode): # Assume reference was passed in - ref_obj = ResourceReference.from_string_reference(ref=trigger) - return _get_trigger_db_by_name_and_pack(name=ref_obj.name, - pack=ref_obj.pack) + return Trigger.get_by_ref(trigger) if isinstance(trigger, dict): name = trigger.get('name', None) pack = trigger.get('pack', None) @@ -73,6 +71,23 @@ def get_trigger_db(trigger): raise Exception('Unrecognized object') +def get_trigger_type_db(ref): + """ + Returns the trigger type object from db given a string ref. + + :param ref: Reference to the trigger type db object. + :type ref: ``str`` + + :rtype trigger_type: ``object`` + """ + try: + return TriggerType.get_by_ref(ref) + except ValueError as e: + LOG.debug('Database lookup for ref="%s" resulted ' + + 'in exception : %s.', ref, e, exc_info=True) + return None + + def _get_trigger_api_given_rule(rule): trigger = rule.trigger triggertype_ref = ResourceReference.from_string_reference(trigger.get('type')) @@ -103,3 +118,23 @@ def create_trigger_db(trigger): def create_trigger_db_from_rule(rule): trigger_api = _get_trigger_api_given_rule(rule) return create_trigger_db(trigger_api) + + +def create_trigger_type_db(trigger_type): + """ + Creates a trigger type db object in the db given trigger_type definition as dict. + + :param trigger_type: Trigger type model. + :type trigger_type: ``dict`` + + :rtype: ``object`` + """ + trigger_type_api = TriggerTypeAPI(**trigger_type) + ref = ResourceReference.to_string_reference(name=trigger_type_api.name, + pack=trigger_type_api.pack) + trigger_type_db = get_trigger_type_db(ref) + if not trigger_type_db: + trigger_type_db = TriggerTypeAPI.to_model(trigger_type_api) + LOG.debug('verified trigger and formulated TriggerDB=%s', trigger_type_db) + trigger_type_db = TriggerType.add_or_update(trigger_type_db) + return trigger_type_db diff --git a/st2reactor/st2reactor/container/utils.py b/st2reactor/st2reactor/container/utils.py index e904e4607d..66032c903c 100644 --- a/st2reactor/st2reactor/container/utils.py +++ b/st2reactor/st2reactor/container/utils.py @@ -17,8 +17,8 @@ from st2common import log as logging from st2common.exceptions.sensors import TriggerTypeRegistrationException -from st2common.persistence.reactor import SensorType, TriggerType, TriggerInstance -from st2common.models.db.reactor import SensorTypeDB, TriggerTypeDB, TriggerInstanceDB +from st2common.persistence.reactor import SensorType, TriggerInstance +from st2common.models.db.reactor import SensorTypeDB, TriggerInstanceDB from st2common.services import triggers as TriggerService from st2common.constants.pack import SYSTEM_PACK_NAME from st2common.constants.sensors import MINIMUM_POLL_INTERVAL @@ -49,33 +49,8 @@ def create_trigger_instance(trigger, payload, occurrence_time): return TriggerInstance.add_or_update(trigger_instance) -def _create_trigger_type(pack, name, description=None, payload_schema=None, - parameters_schema=None): - triggertypes = TriggerType.query(pack=pack, name=name) - is_update = False - if len(triggertypes) > 0: - trigger_type = triggertypes[0] - LOG.debug('Found existing trigger id:%s with name:%s. Will update ' - 'trigger.', trigger_type.id, name) - is_update = True - else: - trigger_type = TriggerTypeDB() - - trigger_type.pack = pack - trigger_type.name = name - trigger_type.description = description - trigger_type.payload_schema = payload_schema - trigger_type.parameters_schema = parameters_schema - try: - triggertype_db = TriggerType.add_or_update(trigger_type) - except: - LOG.exception('Validation failed for TriggerType=%s.', trigger_type) - raise TriggerTypeRegistrationException('Invalid TriggerType name=%s.' % name) - if is_update: - LOG.audit('TriggerType updated. TriggerType=%s', triggertype_db) - else: - LOG.audit('TriggerType created. TriggerType=%s', triggertype_db) - return triggertype_db +def _create_trigger_type(trigger_type): + return TriggerService.create_trigger_type_db(trigger_type) def _validate_trigger_type(trigger_type): @@ -89,67 +64,39 @@ def _validate_trigger_type(trigger_type): raise TriggerTypeRegistrationException('Invalid trigger type. Missing field %s' % field) -def _create_trigger(pack, trigger_type): +def _create_trigger(trigger_type): if hasattr(trigger_type, 'parameters_schema') and not trigger_type['parameters_schema']: - trigger_db = TriggerService.get_trigger_db({'pack': pack, - 'name': trigger_type.name}) - - if trigger_db is None: - trigger_dict = { - 'name': trigger_type.name, - 'pack': pack, - 'type': trigger_type.get_reference().ref - } - - try: - trigger_db = TriggerService.create_trigger_db(trigger_dict) - except: - LOG.exception('Validation failed for Trigger=%s.', trigger_dict) - raise TriggerTypeRegistrationException( - 'Unable to create Trigger for TriggerType=%s.' % trigger_type.name) - else: - return trigger_db + trigger_dict = { + 'name': trigger_type.name, + 'pack': trigger_type.pack, + 'type': trigger_type.get_reference().ref + } + return TriggerService.create_trigger_db(trigger_dict) else: LOG.debug('Won\'t create Trigger object as TriggerType %s expects ' + 'parameters.', trigger_type) return None -def _add_trigger_models(pack, trigger_type): - description = trigger_type['description'] if 'description' in trigger_type else '' - payload_schema = trigger_type['payload_schema'] if 'payload_schema' in trigger_type else {} - parameters_schema = trigger_type['parameters_schema'] \ - if 'parameters_schema' in trigger_type else {} - - trigger_type = _create_trigger_type( - pack=pack, - name=trigger_type['name'], - description=description, - payload_schema=payload_schema, - parameters_schema=parameters_schema - ) - trigger = _create_trigger(pack=pack, - trigger_type=trigger_type) +def _add_trigger_models(trigger_type): + trigger_type = _create_trigger_type(trigger_type) + trigger = _create_trigger(trigger_type=trigger_type) return (trigger_type, trigger) -def add_trigger_models(pack, trigger_types): +def add_trigger_models(trigger_types): """ Register trigger types. - :param pack: Content pack those triggers belong to. - :type pack: ``str`` - :param trigger_types: A list of triggers to register. - :type trigger_types: ``list`` of ``tuple`` (trigger_type, trigger) + :type trigger_types: ``list`` of trigger_type. """ [r for r in (_validate_trigger_type(trigger_type) for trigger_type in trigger_types) if r is not None] result = [] for trigger_type in trigger_types: - item = _add_trigger_models(pack=pack, - trigger_type=trigger_type) + item = _add_trigger_models(trigger_type=trigger_type) if item: result.append(item) diff --git a/st2reactor/tests/test_container_utils.py b/st2reactor/tests/test_container_utils.py index c1a8e3cc47..7fbfbdb64f 100644 --- a/st2reactor/tests/test_container_utils.py +++ b/st2reactor/tests/test_container_utils.py @@ -15,18 +15,18 @@ import mock -from st2common.persistence.reactor import Trigger, TriggerType -from st2common.models.db.reactor import TriggerDB, TriggerTypeDB +from st2common.persistence.reactor import TriggerType +from st2common.models.db.reactor import TriggerDB from st2common.transport.publishers import PoolPublisher import st2reactor.container.utils as container_utils from st2tests.base import DbTestCase -MOCK_TRIGGER_TYPE = TriggerTypeDB() -MOCK_TRIGGER_TYPE.id = 'trigger-type-test.id' -MOCK_TRIGGER_TYPE.name = 'trigger-type-test.name' -MOCK_TRIGGER_TYPE.pack = 'dummy_pack_1' -MOCK_TRIGGER_TYPE.parameters_schema = {} -MOCK_TRIGGER_TYPE.payload_info = {} +MOCK_TRIGGER_TYPE = {} +MOCK_TRIGGER_TYPE['id'] = 'trigger-type-test.id' +MOCK_TRIGGER_TYPE['name'] = 'trigger-type-test.name' +MOCK_TRIGGER_TYPE['pack'] = 'dummy_pack_1' +MOCK_TRIGGER_TYPE['parameters_schema'] = {} +MOCK_TRIGGER_TYPE['payload_schema'] = {} MOCK_TRIGGER = TriggerDB() MOCK_TRIGGER.id = 'trigger-test.id' @@ -38,16 +38,6 @@ @mock.patch.object(PoolPublisher, 'publish', mock.MagicMock()) class ContainerUtilsTest(DbTestCase): - @mock.patch.object(TriggerType, 'query', mock.MagicMock( - return_value=[MOCK_TRIGGER_TYPE])) - @mock.patch.object(Trigger, 'get_by_name', mock.MagicMock(return_value=MOCK_TRIGGER)) - @mock.patch.object(TriggerType, 'add_or_update') - def test_add_trigger(self, mock_add_handler): - mock_add_handler.return_value = MOCK_TRIGGER_TYPE - container_utils.add_trigger_models(pack='dummy_pack_1', - trigger_types=[MOCK_TRIGGER_TYPE]) - self.assertTrue(mock_add_handler.called, 'trigger not added.') - def test_add_trigger_type(self): """ This sensor has misconfigured trigger type. We shouldn't explode. @@ -89,12 +79,11 @@ def test_add_trigger_type_no_params(self): 'parameters_schema': {}, 'payload_schema': {} } - trigtype_dbs = container_utils.add_trigger_models(pack='my_pack_1', - trigger_types=[trig_type]) + trigtype_dbs = container_utils.add_trigger_models(trigger_types=[trig_type]) trigger_type, trigger = trigtype_dbs[0] trigtype_db = TriggerType.get_by_id(trigger_type.id) - self.assertEqual(trigtype_db.pack, 'my_pack_1') + self.assertEqual(trigtype_db.pack, 'dummy_pack_1') self.assertEqual(trigtype_db.name, trig_type.get('name')) self.assertTrue(trigger is not None) self.assertEqual(trigger.name, trigtype_db.name) @@ -112,13 +101,12 @@ def test_add_trigger_type_with_params(self): } trig_type = { 'name': 'myawesometriggertype2', - 'pack': 'dummy_pack_1', + 'pack': 'my_pack_1', 'description': 'Words cannot describe how awesome I am.', 'parameters_schema': PARAMETERS_SCHEMA, 'payload_schema': {} } - trigtype_dbs = container_utils.add_trigger_models(pack='my_pack_1', - trigger_types=[trig_type]) + trigtype_dbs = container_utils.add_trigger_models(trigger_types=[trig_type]) trigger_type, trigger = trigtype_dbs[0] trigtype_db = TriggerType.get_by_id(trigger_type.id) From f2172ee563255b0d9b94818b898ac4183f509232 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:15:19 +0000 Subject: [PATCH 04/12] Fix: Register webhook trigger types from webhooks controller --- st2api/st2api/controllers/v1/webhooks.py | 7 ++++++- st2api/tests/controllers/v1/test_webhooks.py | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/st2api/st2api/controllers/v1/webhooks.py b/st2api/st2api/controllers/v1/webhooks.py index ad6269c101..749e993e81 100644 --- a/st2api/st2api/controllers/v1/webhooks.py +++ b/st2api/st2api/controllers/v1/webhooks.py @@ -26,9 +26,9 @@ from st2common import log as logging from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES from st2common.models.base import jsexpose +import st2common.services.triggers as trigger_service from st2common.services.triggerwatcher import TriggerWatcher from st2common.transport.reactor import TriggerDispatcher - http_client = six.moves.http_client LOG = logging.getLogger(__name__) @@ -48,6 +48,7 @@ def __init__(self, *args, **kwargs): trigger_types=self._trigger_types, queue_suffix='webhooks') self._trigger_watcher.start() + self._register_webhook_trigger_types() @jsexpose() def get_all(self): @@ -99,6 +100,10 @@ def _is_valid_hook(self, hook): def _get_trigger_for_hook(self, hook): return self._hooks[hook] + def _register_webhook_trigger_types(self): + for trigger_type in WEBHOOK_TRIGGER_TYPES.values(): + trigger_service.create_trigger_type_db(trigger_type) + def add_trigger(self, trigger): url = trigger['parameters']['url'] LOG.info('Listening to endpoint: %s', urljoin(self._base_url, url)) diff --git a/st2api/tests/controllers/v1/test_webhooks.py b/st2api/tests/controllers/v1/test_webhooks.py index 78ee5aae77..7a8c29a3e3 100644 --- a/st2api/tests/controllers/v1/test_webhooks.py +++ b/st2api/tests/controllers/v1/test_webhooks.py @@ -18,7 +18,7 @@ from tests import FunctionalTest from st2api.controllers.v1.webhooks import WebhooksController -from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF +from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES from st2common.models.db.reactor import TriggerDB from st2common.transport.reactor import TriggerInstancePublisher @@ -37,7 +37,7 @@ } DUMMY_TRIGGER = TriggerDB(name='pr-merged', pack='git') -DUMMY_TRIGGER.type = GENERIC_WEBHOOK_TRIGGER_REF +DUMMY_TRIGGER.type = WEBHOOK_TRIGGER_TYPES.keys()[0] class TestTriggerTypeController(FunctionalTest): From ac720c4ea49288c30b26b79e34005a793895b5b8 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:15:51 +0000 Subject: [PATCH 05/12] Timer implementation is not a sensor anymore --- st2reactor/st2reactor/timer/__init__.py | 0 st2reactor/st2reactor/timer/base.py | 161 ++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 st2reactor/st2reactor/timer/__init__.py create mode 100644 st2reactor/st2reactor/timer/base.py diff --git a/st2reactor/st2reactor/timer/__init__.py b/st2reactor/st2reactor/timer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/st2reactor/st2reactor/timer/base.py b/st2reactor/st2reactor/timer/base.py new file mode 100644 index 0000000000..fb53c7eced --- /dev/null +++ b/st2reactor/st2reactor/timer/base.py @@ -0,0 +1,161 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from apscheduler.schedulers.background import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger +import apscheduler.util as aps_utils +from dateutil.tz import tzutc +import dateutil.parser as date_parser +import jsonschema + +from st2common import log as logging +from st2common.constants.triggers import TIMER_TRIGGER_TYPES +from st2common.services.triggerwatcher import TriggerWatcher +from st2common.transport.reactor import TriggerDispatcher +import st2reactor.container.utils as container_utils + +LOG = logging.getLogger(__name__) + + +class St2Timer(object): + """ + A timer interface that uses APScheduler 3.0. + """ + def __init__(self, local_timezone=None): + self._timezone = local_timezone + self._scheduler = BlockingScheduler(timezone=self._timezone) + self._jobs = {} + self._trigger_types = TIMER_TRIGGER_TYPES.keys() + self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, + update_handler=self._handle_update_trigger, + delete_handler=self._handle_delete_trigger, + trigger_types=self._trigger_types, + queue_suffix='timers') + self._trigger_dispatcher = TriggerDispatcher(LOG) + + def start(self): + self._register_timer_trigger_types() + self._scheduler.start() + + def cleanup(self): + self._scheduler.shutdown(wait=True) + + def add_trigger(self, trigger): + self._add_job_to_scheduler(trigger) + + def update_trigger(self, trigger): + self.remove_trigger(trigger) + self.add_trigger(trigger) + + def remove_trigger(self, trigger): + id = trigger['id'] + + try: + job_id = self._jobs[id] + except KeyError: + LOG.info('Job not found: %s', id) + return + + self._scheduler.remove_job(job_id) + + def _add_job_to_scheduler(self, trigger): + trigger_type_ref = trigger['type'] + trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref] + try: + jsonschema.validate(trigger['parameters'], + trigger_type['parameters_schema']) + except jsonschema.ValidationError as e: + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) + raise # Or should we just return? + + time_spec = trigger['parameters'] + time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone')) + + time_type = None + + if trigger_type['name'] == 'st2.IntervalTimer': + unit = time_spec.get('unit', None) + value = time_spec.get('delta', None) + time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone}) + elif trigger_type['name'] == 'st2.DateTimer': + # Raises an exception if date string isn't a valid one. + dat = date_parser.parse(time_spec.get('date', None)) + time_type = DateTrigger(dat, timezone=time_zone) + elif trigger_type['name'] == 'st2.CronTimer': + cron = time_spec.copy() + cron['timezone'] = time_zone + + time_type = CronTrigger(**cron) + + if hasattr(time_type, 'run_date') and datetime.now(tzutc()) > time_type.run_date: + LOG.warning('Not scheduling expired timer: %s : %s', + trigger['parameters'], time_type.run_date) + else: + self._add_job(trigger, time_type) + + def _add_job(self, trigger, time_type, replace=True): + try: + job = self._scheduler.add_job(self._emit_trigger_instance, + trigger=time_type, + args=[trigger], + replace_existing=replace) + LOG.info('Job %s scheduled.', job.id) + self._jobs[trigger['id']] = job.id + except Exception as e: + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) + + def _emit_trigger_instance(self, trigger): + LOG.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) + + payload = { + 'executed_at': str(datetime.utcnow()), + 'schedule': trigger['parameters'].get('time') + } + self._trigger_dispatcher.dispatch(trigger, payload) + + def _register_timer_trigger_types(self): + return container_utils.add_trigger_models(TIMER_TRIGGER_TYPES.values()) + + ############################################## + # Event handler methods for the trigger events + ############################################## + + def _handle_create_trigger(self, trigger): + LOG.debug('Calling "add_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.add_trigger(trigger=trigger) + + def _handle_update_trigger(self, trigger): + LOG.debug('Calling "update_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.update_trigger(trigger=trigger) + + def _handle_delete_trigger(self, trigger): + LOG.debug('Calling "remove_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.remove_trigger(trigger=trigger) + + def _sanitize_trigger(self, trigger): + sanitized = trigger._data + if 'id' in sanitized: + # Friendly objectid rather than the MongoEngine representation. + sanitized['id'] = str(sanitized['id']) + return sanitized From 030a0562f9a240a9ee089531b3123a9427a4cbec Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:16:24 +0000 Subject: [PATCH 06/12] Spin timer up as part of rules_engine process --- st2reactor/st2reactor/cmd/rulesengine.py | 4 ++++ st2reactor/st2reactor/rules/config.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/st2reactor/st2reactor/cmd/rulesengine.py b/st2reactor/st2reactor/cmd/rulesengine.py index c4e194d455..b3b248fb08 100644 --- a/st2reactor/st2reactor/cmd/rulesengine.py +++ b/st2reactor/st2reactor/cmd/rulesengine.py @@ -9,6 +9,7 @@ from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH from st2reactor.rules import config from st2reactor.rules import worker +from st2reactor.timer.base import St2Timer LOG = logging.getLogger('st2reactor.bin.rulesengine') @@ -37,8 +38,10 @@ def _teardown(): def main(): + timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone) try: _setup() + timer.start() return worker.work() except SystemExit as exit_code: sys.exit(exit_code) @@ -46,4 +49,5 @@ def main(): LOG.exception('(PID:%s) RulesEngine quit due to exception.', os.getpid()) return 1 finally: + timer.cleanup() _teardown() diff --git a/st2reactor/st2reactor/rules/config.py b/st2reactor/st2reactor/rules/config.py index 50d2e7237f..8900b8d323 100644 --- a/st2reactor/st2reactor/rules/config.py +++ b/st2reactor/st2reactor/rules/config.py @@ -33,6 +33,12 @@ def _register_rules_engine_opts(): ] CONF.register_opts(logging_opts, group='rulesengine') + timer_opts = [ + cfg.StrOpt('local_timezone', default='America/Los_Angeles', + help='Timezone pertaining to the location where st2 is run.') + ] + CONF.register_opts(timer_opts, group='timer') + def register_opts(): _register_common_opts() From ab89579974d28c9eea0de1be75815b8cf161d477 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:16:49 +0000 Subject: [PATCH 07/12] Fix: Remove core sensors registration from sensorsregistrar --- st2reactor/st2reactor/bootstrap/sensorsregistrar.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/st2reactor/st2reactor/bootstrap/sensorsregistrar.py b/st2reactor/st2reactor/bootstrap/sensorsregistrar.py index 461f7c6aca..dc1ebbd1e2 100644 --- a/st2reactor/st2reactor/bootstrap/sensorsregistrar.py +++ b/st2reactor/st2reactor/bootstrap/sensorsregistrar.py @@ -31,8 +31,6 @@ LOG = logging.getLogger(__name__) PATH = os.path.join(os.path.dirname(os.path.realpath(__file__))) -SYSTEM_SENSORS_PATH = os.path.join(PATH, '../contrib/sensors') -SYSTEM_SENSORS_PATH = os.path.abspath(SYSTEM_SENSORS_PATH) class SensorsRegistrar(object): @@ -66,8 +64,7 @@ def _register_sensor_from_pack(self, pack, sensor): poll_interval = metadata.get('poll_interval', None) # Add TrigerType models to the DB - trigger_type_dbs = container_utils.add_trigger_models(pack=pack, - trigger_types=trigger_types) + trigger_type_dbs = container_utils.add_trigger_models(trigger_types=trigger_types) # Populate a list of references belonging to this sensor trigger_type_refs = [] @@ -94,10 +91,6 @@ def register_sensors_from_packs(self, base_dir): pack_loader = ContentPackLoader() dirs = pack_loader.get_content(base_dir=base_dir, content_type='sensors') - # Add system sensors to the core pack - dirs['core'] = {} - dirs['core'] = SYSTEM_SENSORS_PATH - for pack, sensors_dir in six.iteritems(dirs): try: LOG.info('Registering sensors from pack: %s', pack) From 22d4c3fa14051c111a1232cc1c2e94169d763202 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 00:17:12 +0000 Subject: [PATCH 08/12] Get rid of contrib folder in st2reactor --- st2reactor/st2reactor/contrib/__init__.py | 0 .../st2reactor/contrib/sensors/__init__.py | 3 - .../sensors/st2_generic_webhook_sensor.yaml | 16 -- .../contrib/sensors/st2_timer_sensor.py | 156 ------------------ .../contrib/sensors/st2_timer_sensor.yaml | 118 ------------- .../contrib/sensors/st2_webhook_sensor.py | 131 --------------- .../st2webhookschemas/st2webhooks.json | 21 --- 7 files changed, 445 deletions(-) delete mode 100644 st2reactor/st2reactor/contrib/__init__.py delete mode 100644 st2reactor/st2reactor/contrib/sensors/__init__.py delete mode 100644 st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml delete mode 100644 st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py delete mode 100644 st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml delete mode 100644 st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py delete mode 100644 st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json diff --git a/st2reactor/st2reactor/contrib/__init__.py b/st2reactor/st2reactor/contrib/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/st2reactor/st2reactor/contrib/sensors/__init__.py b/st2reactor/st2reactor/contrib/sensors/__init__.py deleted file mode 100644 index 933346ddea..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Module contains StackStorm built sensor samples -""" diff --git a/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml b/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml deleted file mode 100644 index 1bcd27625f..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml +++ /dev/null @@ -1,16 +0,0 @@ ---- - description: "Trigger type for generic webhooks." - trigger_types: - - - name: "st2.webhook" - description: "Triggers on received webhook" - payload_schema: - type: "object" - parameters_schema: - additionalProperties: false - required: - - "url" - type: "object" - properties: - url: - type: "string" diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py deleted file mode 100644 index fc3b88d785..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py +++ /dev/null @@ -1,156 +0,0 @@ -# Licensed to the StackStorm, Inc ('StackStorm') under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -from apscheduler.schedulers.background import BlockingScheduler -from apscheduler.triggers.cron import CronTrigger -from apscheduler.triggers.date import DateTrigger -from apscheduler.triggers.interval import IntervalTrigger -import apscheduler.util as aps_utils -from dateutil.tz import tzutc -import dateutil.parser as date_parser -import jsonschema - -from st2common import log as logging -from st2common.constants.triggers import TIMER_TRIGGER_TYPES -from st2common.services.triggerwatcher import TriggerWatcher -from st2common.transport.reactor import TriggerDispatcher - -LOG = logging.getLogger(__name__) - - -class St2Timer(object): - """ - A timer interface that uses APScheduler 3.0. - """ - def __init__(self, sensor_service=None): - self._timezone = 'America/Los_Angeles' # Whatever TZ local box runs in. - self._scheduler = BlockingScheduler(timezone=self._timezone) - self._jobs = {} - self._trigger_types = TIMER_TRIGGER_TYPES.keys() - self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, - update_handler=self._handle_update_trigger, - delete_handler=self._handle_delete_trigger, - trigger_types=self._trigger_types, - queue_suffix='timers') - self._trigger_dispatcher = TriggerDispatcher(LOG) - - def start(self): - self._scheduler.start() - - def cleanup(self): - self._scheduler.shutdown(wait=True) - - def add_trigger(self, trigger): - self._add_job_to_scheduler(trigger) - - def update_trigger(self, trigger): - self.remove_trigger(trigger) - self.add_trigger(trigger) - - def remove_trigger(self, trigger): - id = trigger['id'] - - try: - job_id = self._jobs[id] - except KeyError: - LOG.info('Job not found: %s', id) - return - - self._scheduler.remove_job(job_id) - - def _add_job_to_scheduler(self, trigger): - trigger_type_ref = trigger['type'] - trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref] - try: - jsonschema.validate(trigger['parameters'], - trigger_type['parameters_schema']) - except jsonschema.ValidationError as e: - LOG.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) - raise # Or should we just return? - - time_spec = trigger['parameters'] - time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone')) - - time_type = None - - if trigger_type['name'] == 'st2.IntervalTimer': - unit = time_spec.get('unit', None) - value = time_spec.get('delta', None) - time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone}) - elif trigger_type['name'] == 'st2.DateTimer': - # Raises an exception if date string isn't a valid one. - dat = date_parser.parse(time_spec.get('date', None)) - time_type = DateTrigger(dat, timezone=time_zone) - elif trigger_type['name'] == 'st2.CronTimer': - cron = time_spec.copy() - cron['timezone'] = time_zone - - time_type = CronTrigger(**cron) - - if hasattr(time_type, 'run_date') and datetime.now(tzutc()) > time_type.run_date: - LOG.warning('Not scheduling expired timer: %s : %s', - trigger['parameters'], time_type.run_date) - else: - self._add_job(trigger, time_type) - - def _add_job(self, trigger, time_type, replace=True): - try: - job = self._scheduler.add_job(self._emit_trigger_instance, - trigger=time_type, - args=[trigger], - replace_existing=replace) - LOG.info('Job %s scheduled.', job.id) - self._jobs[trigger['id']] = job.id - except Exception as e: - LOG.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) - - def _emit_trigger_instance(self, trigger): - LOG.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) - - payload = { - 'executed_at': str(datetime.utcnow()), - 'schedule': trigger['parameters'].get('time') - } - self._trigger_dispatcher.dispatch(trigger, payload) - - ############################################## - # Event handler methods for the trigger events - ############################################## - - def _handle_create_trigger(self, trigger): - LOG.debug('Calling "add_trigger" method (trigger.type=%s)' % (trigger.type)) - trigger = self._sanitize_trigger(trigger=trigger) - self.add_trigger(trigger=trigger) - - def _handle_update_trigger(self, trigger): - LOG.debug('Calling "update_trigger" method (trigger.type=%s)' % (trigger.type)) - trigger = self._sanitize_trigger(trigger=trigger) - self.update_trigger(trigger=trigger) - - def _handle_delete_trigger(self, trigger): - LOG.debug('Calling "remove_trigger" method (trigger.type=%s)' % (trigger.type)) - trigger = self._sanitize_trigger(trigger=trigger) - self.remove_trigger(trigger=trigger) - - def _sanitize_trigger(self, trigger): - sanitized = trigger._data - if 'id' in sanitized: - # Friendly objectid rather than the MongoEngine representation. - sanitized['id'] = str(sanitized['id']) - return sanitized diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml deleted file mode 100644 index bf0402e2ff..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml +++ /dev/null @@ -1,118 +0,0 @@ ---- - class_name: "St2TimerSensor" - entry_point: "st2_timer_sensor.py" - description: "Sensor which emits triggers based on the user-defined schedule or interval" - trigger_types: - - - name: "st2.IntervalTimer" - description: "Triggers on specified intervals. e.g. every 30s, 1week etc." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - required: - - "unit" - - "delta" - type: "object" - properties: - timezone: - type: "string" - unit: - enum: - - "weeks" - - "days" - - "hours" - - "minutes" - - "seconds" - delta: - type: "integer" - - - name: "st2.DateTimer" - description: "Triggers exactly once when the current time matches the specified time. e.g. timezone:UTC date:2014-12-31 23:59:59." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - required: - - "date" - type: "object" - properties: - date: - type: "string" - format: "date-time" - timezone: - type: "string" - - - name: "st2.CronTimer" - description: "Triggers whenever current time matches the specified time constaints like a UNIX cron scheduler." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - type: "object" - properties: - week: - minimum: 1 - type: "integer" - maximum: 53 - second: - minimum: 0 - type: "integer" - maximum: 59 - minute: - minimum: 0 - type: "integer" - maximum: 59 - hour: - minimum: 0 - type: "integer" - maximum: 23 - year: - type: "integer" - timezone: - type: "string" - day: - minimum: 1 - type: "integer" - maximum: 31 - day_of_week: - minimum: 0 - type: "integer" - maximum: 6 - month: - minimum: 1 - type: "integer" - maximum: 12 diff --git a/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py deleted file mode 100644 index b6ba116a63..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py +++ /dev/null @@ -1,131 +0,0 @@ -# Licensed to the StackStorm, Inc ('StackStorm') under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from functools import wraps -import os -import six - -from flask import (jsonify, request, Flask) -import flask_jsonschema -from oslo.config import cfg - -from st2common import log as logging -from st2common.models.system.common import ResourceReference, InvalidResourceReferenceError -from st2reactor.sensor.base import Sensor - -http_client = six.moves.http_client - -LOG = logging.getLogger(__name__) -HOST = cfg.CONF.st2_webhook_sensor.host -PORT = cfg.CONF.st2_webhook_sensor.port -BASE_URL = cfg.CONF.st2_webhook_sensor.url - - -# Dectorators for request validations. -def validate_json(f): - @wraps(f) - def wrapper(*args, **kw): - try: - request.json - except Exception: - msg = 'Content-Type must be application/json.' - return jsonify({'error': msg}), http_client.BAD_REQUEST - return f(*args, **kw) - return wrapper - - -class St2WebhookSensor(Sensor): - ''' - A webhook sensor using a micro-framework Flask. - ''' - # Flask specific stuff. - _app = Flask('st2_webhook_sensor') - _app.config['JSONSCHEMA_DIR'] = os.path.join(_app.root_path, 'st2webhookschemas') - - jsonschema = flask_jsonschema.JsonSchema(_app) - - @_app.errorhandler(flask_jsonschema.ValidationError) - def on_validation_error(e): - data = {'error': str(e)} - js = jsonify(data) - return js, http_client.BAD_REQUEST - - def __init__(self, dispatcher): - self._dispatcher = dispatcher - self._log = self._dispatcher.get_logger(self.__class__.__name__) - self._host = HOST - self._port = PORT - - def setup(self): - self._setup_flask_app() - - def run(self): - St2WebhookSensor._app.run(port=self._port, host=self._host) - - def cleanup(self): - # If Flask is using the default Werkzeug server, then call shutdown on it. - func = request.environ.get('werkzeug.server.shutdown') - if func is None: - raise RuntimeError('Not running with the Werkzeug Server') - func() - - def add_trigger(self, trigger): - pass - - def update_trigger(self, trigger): - pass - - def remove_trigger(self, trigger): - pass - - @validate_json - @flask_jsonschema.validate('st2webhooks', 'create') - def _handle_webhook(self): - body = request.get_json() - - try: - trigger, payload = self._to_trigger(body) - except KeyError as e: - self._log.exception('Exception %s handling webhook', e) - return jsonify({'invalid': str(e)}), http_client.BAD_REQUEST - - try: - self._dispatcher.dispatch(trigger, payload) - except Exception as e: - self._log.exception('Exception %s handling webhook', e) - status = http_client.INTERNAL_SERVER_ERROR - return jsonify({'error': str(e)}), status - - return jsonify({}), http_client.ACCEPTED - - # Flask app specific stuff. - def _setup_flask_app(self): - St2WebhookSensor._app.add_url_rule(BASE_URL, 'st2webhooks', self._handle_webhook, - methods=['POST']) - - def _to_trigger(self, body): - trigger = body.get('trigger', '') - trigger_ref = None - try: - trigger_ref = ResourceReference.from_string_reference(ref=trigger) - except InvalidResourceReferenceError: - LOG.debug('Unable to parse reference.', exc_info=True) - - return { - 'name': trigger_ref.name if trigger_ref else None, - 'pack': trigger_ref.pack if trigger_ref else None, - 'type': body.get('type', ''), - 'parameters': {} - }, body['payload'] diff --git a/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json b/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json deleted file mode 100644 index 6d0ab8d493..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "create": { - "type": "object", - "properties": { - "trigger": { - "description": "Reference to the Trigger. Can use reference to a parameter-less TriggerType.", - "type": "string" - }, - "type": { - "description": "Type of the Trigger i.e. the TriggerType. Used with a parameterized TriggerType where the payload would further clarify the Trigger.", - "type": "string", - "default": "" - }, - "payload": { - "description": "Payload as described at time of TriggerType registration.", - "type": "object" - } - }, - "required": ["payload"] - } -} From aa12fb5ae8939bd2a3cc84d97fc177a904c672b2 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 20:06:27 +0000 Subject: [PATCH 09/12] Spawn scheduler and worker in eventlets (they are blocking) --- st2reactor/st2reactor/cmd/rulesengine.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/st2reactor/st2reactor/cmd/rulesengine.py b/st2reactor/st2reactor/cmd/rulesengine.py index b3b248fb08..378e1e821e 100644 --- a/st2reactor/st2reactor/cmd/rulesengine.py +++ b/st2reactor/st2reactor/cmd/rulesengine.py @@ -1,6 +1,7 @@ import os import sys +import eventlet from oslo.config import cfg from st2common import log as logging @@ -37,12 +38,21 @@ def _teardown(): db_teardown() +def _kickoff_rules_worker(worker): + worker.work() + + +def _kickoff_timer(timer): + timer.start() + + def main(): timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone) try: _setup() - timer.start() - return worker.work() + timer_thread = eventlet.spawn(_kickoff_timer, timer) + worker_thread = eventlet.spawn(_kickoff_rules_worker, worker) + return (timer_thread.wait() and worker_thread.wait()) except SystemExit as exit_code: sys.exit(exit_code) except: From c7d51ec92cc16323266fc2963bacdac615aca220 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 23:15:59 +0000 Subject: [PATCH 10/12] Add test cases for timer --- st2reactor/st2reactor/timer/base.py | 8 +++- st2reactor/tests/test_timer.py | 74 +++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 st2reactor/tests/test_timer.py diff --git a/st2reactor/st2reactor/timer/base.py b/st2reactor/st2reactor/timer/base.py index fb53c7eced..d83cb163ba 100644 --- a/st2reactor/st2reactor/timer/base.py +++ b/st2reactor/st2reactor/timer/base.py @@ -57,7 +57,10 @@ def cleanup(self): self._scheduler.shutdown(wait=True) def add_trigger(self, trigger): - self._add_job_to_scheduler(trigger) + try: + self._add_job_to_scheduler(trigger) + except: + LOG.exception('Unable to add timer for trigger: %s', trigger) def update_trigger(self, trigger): self.remove_trigger(trigger) @@ -111,6 +114,8 @@ def _add_job_to_scheduler(self, trigger): self._add_job(trigger, time_type) def _add_job(self, trigger, time_type, replace=True): + if self._jobs.get(trigger['id'], None): + raise Exception('Should not try to register timer for trigger %s' % trigger) try: job = self._scheduler.add_job(self._emit_trigger_instance, trigger=time_type, @@ -127,7 +132,6 @@ def _emit_trigger_instance(self, trigger): payload = { 'executed_at': str(datetime.utcnow()), - 'schedule': trigger['parameters'].get('time') } self._trigger_dispatcher.dispatch(trigger, payload) diff --git a/st2reactor/tests/test_timer.py b/st2reactor/tests/test_timer.py new file mode 100644 index 0000000000..313c083618 --- /dev/null +++ b/st2reactor/tests/test_timer.py @@ -0,0 +1,74 @@ +import copy + +import bson +import unittest2 + +from st2common.constants.pack import SYSTEM_PACK_NAME +from st2common.models.db.reactor import TriggerDB +from st2common.models.system.common import ResourceReference +from st2reactor.timer.base import St2Timer +import st2tests.config as tests_config + + +class TestDispatcher(object): + def __init__(self): + self.trigger = None + self.payload = None + + def dispatch(self, trigger, payload): + self.trigger = trigger + self.payload = payload + + def assert_payload(self): + return self.payload is not None and self.payload.get('executed_at', None) + + +class TimerTest(unittest2.TestCase): + test_trigger = None + + @classmethod + def setUpClass(cls): + super(TimerTest, cls).setUpClass() + tests_config.parse_args() + parameters = {} + parameters['unit'] = 'seconds' + parameters['delta'] = 30 + ref = ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer') + TimerTest.test_trigger = TriggerDB(name='testtimer', pack='test', parameters=parameters, + type=ref) + TimerTest.test_trigger.id = str(bson.ObjectId()) + + def test_add_remove_timer_trigger(self): + timer = St2Timer(local_timezone='America/Los_Angeles') + self.assertTrue(len(timer._scheduler.get_jobs()) == 0) + timer.add_trigger(TimerTest.test_trigger) + self.assertTrue(len(timer._scheduler.get_jobs()) == 1) + timer.remove_trigger(TimerTest.test_trigger) + self.assertTrue(len(timer._scheduler.get_jobs()) == 0) + + def test_emit_trigger_instance(self): + timer = St2Timer(local_timezone='America/Los_Angeles') + mock_dispatcher = TestDispatcher() + setattr(timer, '_trigger_dispatcher', mock_dispatcher) + timer._emit_trigger_instance(TimerTest.test_trigger) + self.assertTrue(mock_dispatcher.assert_payload()) + + def test_invalid_schema_timer(self): + timer = St2Timer(local_timezone='America/Los_Angeles') + fail_timer = copy.copy(TimerTest.test_trigger) + del fail_timer.parameters['unit'] + timer.add_trigger(fail_timer) + self.assertTrue(len(timer._scheduler.get_jobs()) == 0) + + def test_duplicate_timer_trigger(self): + timer = St2Timer(local_timezone='America/Los_Angeles') + self.assertTrue(len(timer._scheduler.get_jobs()) == 0) + timer.add_trigger(TimerTest.test_trigger) + self.assertTrue(len(timer._scheduler.get_jobs()) == 1) + try: + timer.add_trigger(TimerTest.test_trigger) + except: + self.assertTrue(len(timer._scheduler.get_jobs()) == 1) + pass + timer.remove_trigger(TimerTest.test_trigger) + self.assertTrue(len(timer._scheduler.get_jobs()) == 0) From a6c530134e97f6e555fea2fff27efd5d399e0902 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 23:38:23 +0000 Subject: [PATCH 11/12] Add end-to-end test case --- st2reactor/tests/test_timer.py | 38 +++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/st2reactor/tests/test_timer.py b/st2reactor/tests/test_timer.py index 313c083618..551898865f 100644 --- a/st2reactor/tests/test_timer.py +++ b/st2reactor/tests/test_timer.py @@ -1,12 +1,29 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import copy import bson -import unittest2 +import eventlet +import mock from st2common.constants.pack import SYSTEM_PACK_NAME from st2common.models.db.reactor import TriggerDB from st2common.models.system.common import ResourceReference from st2reactor.timer.base import St2Timer +from st2tests.base import EventletTestCase import st2tests.config as tests_config @@ -23,7 +40,7 @@ def assert_payload(self): return self.payload is not None and self.payload.get('executed_at', None) -class TimerTest(unittest2.TestCase): +class TimerTest(EventletTestCase): test_trigger = None @classmethod @@ -32,7 +49,7 @@ def setUpClass(cls): tests_config.parse_args() parameters = {} parameters['unit'] = 'seconds' - parameters['delta'] = 30 + parameters['delta'] = 1 ref = ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer') TimerTest.test_trigger = TriggerDB(name='testtimer', pack='test', parameters=parameters, type=ref) @@ -72,3 +89,18 @@ def test_duplicate_timer_trigger(self): pass timer.remove_trigger(TimerTest.test_trigger) self.assertTrue(len(timer._scheduler.get_jobs()) == 0) + + @mock.patch.object(St2Timer, '_register_timer_trigger_types', mock.MagicMock()) + def test_timer_end_to_end(self): + timer = St2Timer(local_timezone='America/Los_Angeles') + mock_dispatcher = TestDispatcher() + setattr(timer, '_trigger_dispatcher', mock_dispatcher) + timer.add_trigger(TimerTest.test_trigger) + + def kickoff_timer(timer): + timer.start() + + eventlet.spawn(kickoff_timer, timer) + eventlet.sleep(2) + self.assertTrue(mock_dispatcher.assert_payload()) + timer.cleanup() From fe5332ba2ef6e92a5780e5d0b7268bd171956e95 Mon Sep 17 00:00:00 2001 From: Lakshmi Kannan Date: Thu, 18 Dec 2014 23:45:25 +0000 Subject: [PATCH 12/12] assert that mock dispatcher is not called before timer start --- st2reactor/tests/test_timer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/st2reactor/tests/test_timer.py b/st2reactor/tests/test_timer.py index 551898865f..b7052cd3e6 100644 --- a/st2reactor/tests/test_timer.py +++ b/st2reactor/tests/test_timer.py @@ -101,6 +101,7 @@ def kickoff_timer(timer): timer.start() eventlet.spawn(kickoff_timer, timer) + self.assertFalse(mock_dispatcher.assert_payload()) eventlet.sleep(2) self.assertTrue(mock_dispatcher.assert_payload()) timer.cleanup()