diff --git a/st2api/st2api/controllers/v1/webhooks.py b/st2api/st2api/controllers/v1/webhooks.py index 1c1e2cf46e..749e993e81 100644 --- a/st2api/st2api/controllers/v1/webhooks.py +++ b/st2api/st2api/controllers/v1/webhooks.py @@ -24,11 +24,11 @@ from urlparse import urljoin from st2common import log as logging -from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF +from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES from st2common.models.base import jsexpose +import st2common.services.triggers as trigger_service from st2common.services.triggerwatcher import TriggerWatcher from st2common.transport.reactor import TriggerDispatcher - http_client = six.moves.http_client LOG = logging.getLogger(__name__) @@ -39,7 +39,7 @@ def __init__(self, *args, **kwargs): super(WebhooksController, self).__init__(*args, **kwargs) self._hooks = {} self._base_url = '/webhooks/' - self._trigger_types = [GENERIC_WEBHOOK_TRIGGER_REF] + self._trigger_types = WEBHOOK_TRIGGER_TYPES.keys() self._trigger_dispatcher = TriggerDispatcher(LOG) self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, @@ -48,6 +48,7 @@ def __init__(self, *args, **kwargs): trigger_types=self._trigger_types, queue_suffix='webhooks') self._trigger_watcher.start() + self._register_webhook_trigger_types() @jsexpose() def get_all(self): @@ -99,6 +100,10 @@ def _is_valid_hook(self, hook): def _get_trigger_for_hook(self, hook): return self._hooks[hook] + def _register_webhook_trigger_types(self): + for trigger_type in WEBHOOK_TRIGGER_TYPES.values(): + trigger_service.create_trigger_type_db(trigger_type) + def add_trigger(self, trigger): url = trigger['parameters']['url'] LOG.info('Listening to endpoint: %s', urljoin(self._base_url, url)) diff --git a/st2api/tests/unit/controllers/v1/test_webhooks.py b/st2api/tests/unit/controllers/v1/test_webhooks.py index 78ee5aae77..7a8c29a3e3 100644 --- a/st2api/tests/unit/controllers/v1/test_webhooks.py +++ b/st2api/tests/unit/controllers/v1/test_webhooks.py @@ -18,7 +18,7 @@ from tests import FunctionalTest from st2api.controllers.v1.webhooks import WebhooksController -from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF +from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES from st2common.models.db.reactor import TriggerDB from st2common.transport.reactor import TriggerInstancePublisher @@ -37,7 +37,7 @@ } DUMMY_TRIGGER = TriggerDB(name='pr-merged', pack='git') -DUMMY_TRIGGER.type = GENERIC_WEBHOOK_TRIGGER_REF +DUMMY_TRIGGER.type = WEBHOOK_TRIGGER_TYPES.keys()[0] class TestTriggerTypeController(FunctionalTest): diff --git a/st2common/st2common/constants/triggers.py b/st2common/st2common/constants/triggers.py index 3e9437aa0c..a230afb295 100644 --- a/st2common/st2common/constants/triggers.py +++ b/st2common/st2common/constants/triggers.py @@ -13,4 +13,168 @@ # See the License for the specific language governing permissions and # limitations under the License. -GENERIC_WEBHOOK_TRIGGER_REF = 'core.st2.webhook' +from st2common.constants.pack import SYSTEM_PACK_NAME +from st2common.models.system.common import ResourceReference + + +WEBHOOKS_PARAMETERS_SCHEMA = { + 'type': 'object', + 'properties': { + 'url': { + 'type': 'string' + } + }, + 'required': [ + 'url' + ], + 'additionalProperties': False +} + + +WEBHOOKS_PAYLOAD_SCHEMA = { + 'type': 'object' +} + +WEBHOOK_TRIGGER_TYPES = { + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.webhook'): { + 'name': 'st2.webhook', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Trigger type for registering webhooks that can consume' + + ' arbitrary payload.', + 'parameters_schema': WEBHOOKS_PARAMETERS_SCHEMA, + 'payload_schema': WEBHOOKS_PAYLOAD_SCHEMA + } +} + +# Timer specs + +INTERVAL_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "unit": { + "enum": ["weeks", "days", "hours", "minutes", "seconds"] + }, + "delta": { + "type": "integer" + } + }, + "required": [ + "unit", + "delta" + ], + "additionalProperties": False +} + +DATE_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "date": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "date" + ], + "additionalProperties": False +} + +CRON_PARAMETERS_SCHEMA = { + "type": "object", + "properties": { + "timezone": { + "type": "string" + }, + "year": { + "type": "integer" + }, + "month": { + "type": "integer", + "minimum": 1, + "maximum": 12 + }, + "day": { + "type": "integer", + "minimum": 1, + "maximum": 31 + }, + "week": { + "type": "integer", + "minimum": 1, + "maximum": 53 + }, + "day_of_week": { + "type": "integer", + "minimum": 0, + "maximum": 6 + }, + "hour": { + "type": "integer", + "minimum": 0, + "maximum": 23 + }, + "minute": { + "type": "integer", + "minimum": 0, + "maximum": 59 + }, + "second": { + "type": "integer", + "minimum": 0, + "maximum": 59 + } + }, + "additionalProperties": False +} + +TIMER_PAYLOAD_SCHEMA = { + "type": "object", + "properties": { + "executed_at": { + "type": "string", + "format": "date-time", + "default": "2014-07-30 05:04:24.578325" + }, + "schedule": { + "type": "object", + "default": { + "delta": 30, + "units": "seconds" + } + } + } +} + +TIMER_TRIGGER_TYPES = { + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer'): { + 'name': 'st2.IntervalTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers on specified intervals. e.g. every 30s, 1week etc.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': INTERVAL_PARAMETERS_SCHEMA + }, + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.DateTimer'): { + 'name': 'st2.DateTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers exactly once when the current time matches the specified time. ' + 'e.g. timezone:UTC date:2014-12-31 23:59:59.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': DATE_PARAMETERS_SCHEMA + }, + ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.CronTimer'): { + 'name': 'st2.CronTimer', + 'pack': SYSTEM_PACK_NAME, + 'description': 'Triggers whenever current time matches the specified time constaints like ' + 'a UNIX cron scheduler.', + 'payload_schema': TIMER_PAYLOAD_SCHEMA, + 'parameters_schema': CRON_PARAMETERS_SCHEMA + } +} + +SYSTEM_TRIGGER_TYPES = dict(WEBHOOK_TRIGGER_TYPES.items() + TIMER_TRIGGER_TYPES.items()) diff --git a/st2common/st2common/services/triggers.py b/st2common/st2common/services/triggers.py index a37afb591b..7813b45e6d 100644 --- a/st2common/st2common/services/triggers.py +++ b/st2common/st2common/services/triggers.py @@ -14,9 +14,9 @@ # limitations under the License. from st2common import log as logging -from st2common.models.api.reactor import TriggerAPI +from st2common.models.api.reactor import (TriggerAPI, TriggerTypeAPI) from st2common.models.system.common import ResourceReference -from st2common.persistence.reactor import Trigger +from st2common.persistence.reactor import (Trigger, TriggerType) LOG = logging.getLogger(__name__) @@ -44,9 +44,7 @@ def get_trigger_db(trigger): # TODO: This method should die in a fire if isinstance(trigger, str) or isinstance(trigger, unicode): # Assume reference was passed in - ref_obj = ResourceReference.from_string_reference(ref=trigger) - return _get_trigger_db_by_name_and_pack(name=ref_obj.name, - pack=ref_obj.pack) + return Trigger.get_by_ref(trigger) if isinstance(trigger, dict): name = trigger.get('name', None) pack = trigger.get('pack', None) @@ -73,6 +71,23 @@ def get_trigger_db(trigger): raise Exception('Unrecognized object') +def get_trigger_type_db(ref): + """ + Returns the trigger type object from db given a string ref. + + :param ref: Reference to the trigger type db object. + :type ref: ``str`` + + :rtype trigger_type: ``object`` + """ + try: + return TriggerType.get_by_ref(ref) + except ValueError as e: + LOG.debug('Database lookup for ref="%s" resulted ' + + 'in exception : %s.', ref, e, exc_info=True) + return None + + def _get_trigger_api_given_rule(rule): trigger = rule.trigger triggertype_ref = ResourceReference.from_string_reference(trigger.get('type')) @@ -103,3 +118,23 @@ def create_trigger_db(trigger): def create_trigger_db_from_rule(rule): trigger_api = _get_trigger_api_given_rule(rule) return create_trigger_db(trigger_api) + + +def create_trigger_type_db(trigger_type): + """ + Creates a trigger type db object in the db given trigger_type definition as dict. + + :param trigger_type: Trigger type model. + :type trigger_type: ``dict`` + + :rtype: ``object`` + """ + trigger_type_api = TriggerTypeAPI(**trigger_type) + ref = ResourceReference.to_string_reference(name=trigger_type_api.name, + pack=trigger_type_api.pack) + trigger_type_db = get_trigger_type_db(ref) + if not trigger_type_db: + trigger_type_db = TriggerTypeAPI.to_model(trigger_type_api) + LOG.debug('verified trigger and formulated TriggerDB=%s', trigger_type_db) + trigger_type_db = TriggerType.add_or_update(trigger_type_db) + return trigger_type_db diff --git a/st2reactor/st2reactor/bootstrap/sensorsregistrar.py b/st2reactor/st2reactor/bootstrap/sensorsregistrar.py index 461f7c6aca..dc1ebbd1e2 100644 --- a/st2reactor/st2reactor/bootstrap/sensorsregistrar.py +++ b/st2reactor/st2reactor/bootstrap/sensorsregistrar.py @@ -31,8 +31,6 @@ LOG = logging.getLogger(__name__) PATH = os.path.join(os.path.dirname(os.path.realpath(__file__))) -SYSTEM_SENSORS_PATH = os.path.join(PATH, '../contrib/sensors') -SYSTEM_SENSORS_PATH = os.path.abspath(SYSTEM_SENSORS_PATH) class SensorsRegistrar(object): @@ -66,8 +64,7 @@ def _register_sensor_from_pack(self, pack, sensor): poll_interval = metadata.get('poll_interval', None) # Add TrigerType models to the DB - trigger_type_dbs = container_utils.add_trigger_models(pack=pack, - trigger_types=trigger_types) + trigger_type_dbs = container_utils.add_trigger_models(trigger_types=trigger_types) # Populate a list of references belonging to this sensor trigger_type_refs = [] @@ -94,10 +91,6 @@ def register_sensors_from_packs(self, base_dir): pack_loader = ContentPackLoader() dirs = pack_loader.get_content(base_dir=base_dir, content_type='sensors') - # Add system sensors to the core pack - dirs['core'] = {} - dirs['core'] = SYSTEM_SENSORS_PATH - for pack, sensors_dir in six.iteritems(dirs): try: LOG.info('Registering sensors from pack: %s', pack) diff --git a/st2reactor/st2reactor/cmd/rulesengine.py b/st2reactor/st2reactor/cmd/rulesengine.py index c4e194d455..378e1e821e 100644 --- a/st2reactor/st2reactor/cmd/rulesengine.py +++ b/st2reactor/st2reactor/cmd/rulesengine.py @@ -1,6 +1,7 @@ import os import sys +import eventlet from oslo.config import cfg from st2common import log as logging @@ -9,6 +10,7 @@ from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH from st2reactor.rules import config from st2reactor.rules import worker +from st2reactor.timer.base import St2Timer LOG = logging.getLogger('st2reactor.bin.rulesengine') @@ -36,14 +38,26 @@ def _teardown(): db_teardown() +def _kickoff_rules_worker(worker): + worker.work() + + +def _kickoff_timer(timer): + timer.start() + + def main(): + timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone) try: _setup() - return worker.work() + timer_thread = eventlet.spawn(_kickoff_timer, timer) + worker_thread = eventlet.spawn(_kickoff_rules_worker, worker) + return (timer_thread.wait() and worker_thread.wait()) except SystemExit as exit_code: sys.exit(exit_code) except: LOG.exception('(PID:%s) RulesEngine quit due to exception.', os.getpid()) return 1 finally: + timer.cleanup() _teardown() diff --git a/st2reactor/st2reactor/container/utils.py b/st2reactor/st2reactor/container/utils.py index e904e4607d..66032c903c 100644 --- a/st2reactor/st2reactor/container/utils.py +++ b/st2reactor/st2reactor/container/utils.py @@ -17,8 +17,8 @@ from st2common import log as logging from st2common.exceptions.sensors import TriggerTypeRegistrationException -from st2common.persistence.reactor import SensorType, TriggerType, TriggerInstance -from st2common.models.db.reactor import SensorTypeDB, TriggerTypeDB, TriggerInstanceDB +from st2common.persistence.reactor import SensorType, TriggerInstance +from st2common.models.db.reactor import SensorTypeDB, TriggerInstanceDB from st2common.services import triggers as TriggerService from st2common.constants.pack import SYSTEM_PACK_NAME from st2common.constants.sensors import MINIMUM_POLL_INTERVAL @@ -49,33 +49,8 @@ def create_trigger_instance(trigger, payload, occurrence_time): return TriggerInstance.add_or_update(trigger_instance) -def _create_trigger_type(pack, name, description=None, payload_schema=None, - parameters_schema=None): - triggertypes = TriggerType.query(pack=pack, name=name) - is_update = False - if len(triggertypes) > 0: - trigger_type = triggertypes[0] - LOG.debug('Found existing trigger id:%s with name:%s. Will update ' - 'trigger.', trigger_type.id, name) - is_update = True - else: - trigger_type = TriggerTypeDB() - - trigger_type.pack = pack - trigger_type.name = name - trigger_type.description = description - trigger_type.payload_schema = payload_schema - trigger_type.parameters_schema = parameters_schema - try: - triggertype_db = TriggerType.add_or_update(trigger_type) - except: - LOG.exception('Validation failed for TriggerType=%s.', trigger_type) - raise TriggerTypeRegistrationException('Invalid TriggerType name=%s.' % name) - if is_update: - LOG.audit('TriggerType updated. TriggerType=%s', triggertype_db) - else: - LOG.audit('TriggerType created. TriggerType=%s', triggertype_db) - return triggertype_db +def _create_trigger_type(trigger_type): + return TriggerService.create_trigger_type_db(trigger_type) def _validate_trigger_type(trigger_type): @@ -89,67 +64,39 @@ def _validate_trigger_type(trigger_type): raise TriggerTypeRegistrationException('Invalid trigger type. Missing field %s' % field) -def _create_trigger(pack, trigger_type): +def _create_trigger(trigger_type): if hasattr(trigger_type, 'parameters_schema') and not trigger_type['parameters_schema']: - trigger_db = TriggerService.get_trigger_db({'pack': pack, - 'name': trigger_type.name}) - - if trigger_db is None: - trigger_dict = { - 'name': trigger_type.name, - 'pack': pack, - 'type': trigger_type.get_reference().ref - } - - try: - trigger_db = TriggerService.create_trigger_db(trigger_dict) - except: - LOG.exception('Validation failed for Trigger=%s.', trigger_dict) - raise TriggerTypeRegistrationException( - 'Unable to create Trigger for TriggerType=%s.' % trigger_type.name) - else: - return trigger_db + trigger_dict = { + 'name': trigger_type.name, + 'pack': trigger_type.pack, + 'type': trigger_type.get_reference().ref + } + return TriggerService.create_trigger_db(trigger_dict) else: LOG.debug('Won\'t create Trigger object as TriggerType %s expects ' + 'parameters.', trigger_type) return None -def _add_trigger_models(pack, trigger_type): - description = trigger_type['description'] if 'description' in trigger_type else '' - payload_schema = trigger_type['payload_schema'] if 'payload_schema' in trigger_type else {} - parameters_schema = trigger_type['parameters_schema'] \ - if 'parameters_schema' in trigger_type else {} - - trigger_type = _create_trigger_type( - pack=pack, - name=trigger_type['name'], - description=description, - payload_schema=payload_schema, - parameters_schema=parameters_schema - ) - trigger = _create_trigger(pack=pack, - trigger_type=trigger_type) +def _add_trigger_models(trigger_type): + trigger_type = _create_trigger_type(trigger_type) + trigger = _create_trigger(trigger_type=trigger_type) return (trigger_type, trigger) -def add_trigger_models(pack, trigger_types): +def add_trigger_models(trigger_types): """ Register trigger types. - :param pack: Content pack those triggers belong to. - :type pack: ``str`` - :param trigger_types: A list of triggers to register. - :type trigger_types: ``list`` of ``tuple`` (trigger_type, trigger) + :type trigger_types: ``list`` of trigger_type. """ [r for r in (_validate_trigger_type(trigger_type) for trigger_type in trigger_types) if r is not None] result = [] for trigger_type in trigger_types: - item = _add_trigger_models(pack=pack, - trigger_type=trigger_type) + item = _add_trigger_models(trigger_type=trigger_type) if item: result.append(item) diff --git a/st2reactor/st2reactor/contrib/sensors/__init__.py b/st2reactor/st2reactor/contrib/sensors/__init__.py deleted file mode 100644 index 933346ddea..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Module contains StackStorm built sensor samples -""" diff --git a/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml b/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml deleted file mode 100644 index 1bcd27625f..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_generic_webhook_sensor.yaml +++ /dev/null @@ -1,16 +0,0 @@ ---- - description: "Trigger type for generic webhooks." - trigger_types: - - - name: "st2.webhook" - description: "Triggers on received webhook" - payload_schema: - type: "object" - parameters_schema: - additionalProperties: false - required: - - "url" - type: "object" - properties: - url: - type: "string" diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py deleted file mode 100644 index fa63540179..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.py +++ /dev/null @@ -1,258 +0,0 @@ -# Licensed to the StackStorm, Inc ('StackStorm') under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -from apscheduler.schedulers.background import BlockingScheduler -from apscheduler.triggers.cron import CronTrigger -from apscheduler.triggers.date import DateTrigger -from apscheduler.triggers.interval import IntervalTrigger -import apscheduler.util as aps_utils -from dateutil.tz import tzutc -import dateutil.parser as date_parser -import jsonschema - -from st2common.constants.pack import SYSTEM_PACK_NAME -from st2common.models.system.common import ResourceReference -from st2reactor.sensor.base import Sensor - - -INTERVAL_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "unit": { - "enum": ["weeks", "days", "hours", "minutes", "seconds"] - }, - "delta": { - "type": "integer" - } - }, - "required": [ - "unit", - "delta" - ], - "additionalProperties": False -} - -DATE_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "date": { - "type": "string", - "format": "date-time" - } - }, - "required": [ - "date" - ], - "additionalProperties": False -} - -CRON_PARAMETERS_SCHEMA = { - "type": "object", - "properties": { - "timezone": { - "type": "string" - }, - "year": { - "type": "integer" - }, - "month": { - "type": "integer", - "minimum": 1, - "maximum": 12 - }, - "day": { - "type": "integer", - "minimum": 1, - "maximum": 31 - }, - "week": { - "type": "integer", - "minimum": 1, - "maximum": 53 - }, - "day_of_week": { - "type": "integer", - "minimum": 0, - "maximum": 6 - }, - "hour": { - "type": "integer", - "minimum": 0, - "maximum": 23 - }, - "minute": { - "type": "integer", - "minimum": 0, - "maximum": 59 - }, - "second": { - "type": "integer", - "minimum": 0, - "maximum": 59 - } - }, - "additionalProperties": False -} - -PAYLOAD_SCHEMA = { - "type": "object", - "properties": { - "executed_at": { - "type": "string", - "format": "date-time", - "default": "2014-07-30 05:04:24.578325" - }, - "schedule": { - "type": "object", - "default": { - "delta": 30, - "units": "seconds" - } - } - } -} - -TRIGGER_TYPES = { - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer'): { - 'name': 'st2.IntervalTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers on specified intervals. e.g. every 30s, 1week etc.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': INTERVAL_PARAMETERS_SCHEMA - }, - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.DateTimer'): { - 'name': 'st2.DateTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers exactly once when the current time matches the specified time. ' - 'e.g. timezone:UTC date:2014-12-31 23:59:59.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': DATE_PARAMETERS_SCHEMA - }, - ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.CronTimer'): { - 'name': 'st2.CronTimer', - 'pack': SYSTEM_PACK_NAME, - 'description': 'Triggers whenever current time matches the specified time constaints like ' - 'a UNIX cron scheduler.', - 'payload_schema': PAYLOAD_SCHEMA, - 'parameters_schema': CRON_PARAMETERS_SCHEMA - } -} - - -class St2TimerSensor(Sensor): - ''' - A timer sensor that uses APScheduler 3.0. - ''' - def __init__(self, sensor_service=None): - self._timezone = 'America/Los_Angeles' # Whatever TZ local box runs in. - self._sensor_service = sensor_service - self._log = self._sensor_service.get_logger(self.__class__.__name__) - self._scheduler = BlockingScheduler(timezone=self._timezone) - self._jobs = {} - - def setup(self): - pass - - def run(self): - self._scheduler.start() - - def cleanup(self): - self._scheduler.shutdown(wait=True) - - def add_trigger(self, trigger): - self._add_job_to_scheduler(trigger) - - def update_trigger(self, trigger): - self.remove_trigger(trigger) - self.add_trigger(trigger) - - def remove_trigger(self, trigger): - id = trigger['id'] - - try: - job_id = self._jobs[id] - except KeyError: - self._log.info('Job not found: %s', id) - return - - self._scheduler.remove_job(job_id) - - def _get_trigger_type(self, ref): - pass - - def _add_job_to_scheduler(self, trigger): - trigger_type_ref = trigger['type'] - trigger_type = TRIGGER_TYPES[trigger_type_ref] - try: - jsonschema.validate(trigger['parameters'], - trigger_type['parameters_schema']) - except jsonschema.ValidationError as e: - self._log.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) - raise # Or should we just return? - - time_spec = trigger['parameters'] - time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone')) - - time_type = None - - if trigger_type['name'] == 'st2.IntervalTimer': - unit = time_spec.get('unit', None) - value = time_spec.get('delta', None) - time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone}) - elif trigger_type['name'] == 'st2.DateTimer': - # Raises an exception if date string isn't a valid one. - dat = date_parser.parse(time_spec.get('date', None)) - time_type = DateTrigger(dat, timezone=time_zone) - elif trigger_type['name'] == 'st2.CronTimer': - cron = time_spec.copy() - cron['timezone'] = time_zone - - time_type = CronTrigger(**cron) - - if hasattr(time_type, 'run_date') and datetime.now(tzutc()) > time_type.run_date: - self._log.warning('Not scheduling expired timer: %s : %s', - trigger['parameters'], time_type.run_date) - else: - self._add_job(trigger, time_type) - - def _add_job(self, trigger, time_type, replace=True): - try: - job = self._scheduler.add_job(self._emit_trigger_instance, - trigger=time_type, - args=[trigger], - replace_existing=replace) - self._log.info('Job %s scheduled.', job.id) - self._jobs[trigger['id']] = job.id - except Exception as e: - self._log.error('Exception scheduling timer: %s, %s', - trigger['parameters'], e, exc_info=True) - - def _emit_trigger_instance(self, trigger): - self._log.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) - - payload = { - 'executed_at': str(datetime.utcnow()), - 'schedule': trigger['parameters'].get('time') - } - self._sensor_service.dispatch(trigger, payload) diff --git a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml b/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml deleted file mode 100644 index bf0402e2ff..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_timer_sensor.yaml +++ /dev/null @@ -1,118 +0,0 @@ ---- - class_name: "St2TimerSensor" - entry_point: "st2_timer_sensor.py" - description: "Sensor which emits triggers based on the user-defined schedule or interval" - trigger_types: - - - name: "st2.IntervalTimer" - description: "Triggers on specified intervals. e.g. every 30s, 1week etc." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - required: - - "unit" - - "delta" - type: "object" - properties: - timezone: - type: "string" - unit: - enum: - - "weeks" - - "days" - - "hours" - - "minutes" - - "seconds" - delta: - type: "integer" - - - name: "st2.DateTimer" - description: "Triggers exactly once when the current time matches the specified time. e.g. timezone:UTC date:2014-12-31 23:59:59." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - required: - - "date" - type: "object" - properties: - date: - type: "string" - format: "date-time" - timezone: - type: "string" - - - name: "st2.CronTimer" - description: "Triggers whenever current time matches the specified time constaints like a UNIX cron scheduler." - payload_schema: - type: "object" - properties: - executed_at: - type: "string" - format: "date-time" - default: "2014-07-30 05:04:24.578325" - schedule: - type: "object" - default: - delta: 30 - units: "seconds" - type: "object" - parameters_schema: - additionalProperties: false - type: "object" - properties: - week: - minimum: 1 - type: "integer" - maximum: 53 - second: - minimum: 0 - type: "integer" - maximum: 59 - minute: - minimum: 0 - type: "integer" - maximum: 59 - hour: - minimum: 0 - type: "integer" - maximum: 23 - year: - type: "integer" - timezone: - type: "string" - day: - minimum: 1 - type: "integer" - maximum: 31 - day_of_week: - minimum: 0 - type: "integer" - maximum: 6 - month: - minimum: 1 - type: "integer" - maximum: 12 diff --git a/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py b/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py deleted file mode 100644 index b6ba116a63..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2_webhook_sensor.py +++ /dev/null @@ -1,131 +0,0 @@ -# Licensed to the StackStorm, Inc ('StackStorm') under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from functools import wraps -import os -import six - -from flask import (jsonify, request, Flask) -import flask_jsonschema -from oslo.config import cfg - -from st2common import log as logging -from st2common.models.system.common import ResourceReference, InvalidResourceReferenceError -from st2reactor.sensor.base import Sensor - -http_client = six.moves.http_client - -LOG = logging.getLogger(__name__) -HOST = cfg.CONF.st2_webhook_sensor.host -PORT = cfg.CONF.st2_webhook_sensor.port -BASE_URL = cfg.CONF.st2_webhook_sensor.url - - -# Dectorators for request validations. -def validate_json(f): - @wraps(f) - def wrapper(*args, **kw): - try: - request.json - except Exception: - msg = 'Content-Type must be application/json.' - return jsonify({'error': msg}), http_client.BAD_REQUEST - return f(*args, **kw) - return wrapper - - -class St2WebhookSensor(Sensor): - ''' - A webhook sensor using a micro-framework Flask. - ''' - # Flask specific stuff. - _app = Flask('st2_webhook_sensor') - _app.config['JSONSCHEMA_DIR'] = os.path.join(_app.root_path, 'st2webhookschemas') - - jsonschema = flask_jsonschema.JsonSchema(_app) - - @_app.errorhandler(flask_jsonschema.ValidationError) - def on_validation_error(e): - data = {'error': str(e)} - js = jsonify(data) - return js, http_client.BAD_REQUEST - - def __init__(self, dispatcher): - self._dispatcher = dispatcher - self._log = self._dispatcher.get_logger(self.__class__.__name__) - self._host = HOST - self._port = PORT - - def setup(self): - self._setup_flask_app() - - def run(self): - St2WebhookSensor._app.run(port=self._port, host=self._host) - - def cleanup(self): - # If Flask is using the default Werkzeug server, then call shutdown on it. - func = request.environ.get('werkzeug.server.shutdown') - if func is None: - raise RuntimeError('Not running with the Werkzeug Server') - func() - - def add_trigger(self, trigger): - pass - - def update_trigger(self, trigger): - pass - - def remove_trigger(self, trigger): - pass - - @validate_json - @flask_jsonschema.validate('st2webhooks', 'create') - def _handle_webhook(self): - body = request.get_json() - - try: - trigger, payload = self._to_trigger(body) - except KeyError as e: - self._log.exception('Exception %s handling webhook', e) - return jsonify({'invalid': str(e)}), http_client.BAD_REQUEST - - try: - self._dispatcher.dispatch(trigger, payload) - except Exception as e: - self._log.exception('Exception %s handling webhook', e) - status = http_client.INTERNAL_SERVER_ERROR - return jsonify({'error': str(e)}), status - - return jsonify({}), http_client.ACCEPTED - - # Flask app specific stuff. - def _setup_flask_app(self): - St2WebhookSensor._app.add_url_rule(BASE_URL, 'st2webhooks', self._handle_webhook, - methods=['POST']) - - def _to_trigger(self, body): - trigger = body.get('trigger', '') - trigger_ref = None - try: - trigger_ref = ResourceReference.from_string_reference(ref=trigger) - except InvalidResourceReferenceError: - LOG.debug('Unable to parse reference.', exc_info=True) - - return { - 'name': trigger_ref.name if trigger_ref else None, - 'pack': trigger_ref.pack if trigger_ref else None, - 'type': body.get('type', ''), - 'parameters': {} - }, body['payload'] diff --git a/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json b/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json deleted file mode 100644 index 6d0ab8d493..0000000000 --- a/st2reactor/st2reactor/contrib/sensors/st2webhookschemas/st2webhooks.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "create": { - "type": "object", - "properties": { - "trigger": { - "description": "Reference to the Trigger. Can use reference to a parameter-less TriggerType.", - "type": "string" - }, - "type": { - "description": "Type of the Trigger i.e. the TriggerType. Used with a parameterized TriggerType where the payload would further clarify the Trigger.", - "type": "string", - "default": "" - }, - "payload": { - "description": "Payload as described at time of TriggerType registration.", - "type": "object" - } - }, - "required": ["payload"] - } -} diff --git a/st2reactor/st2reactor/rules/config.py b/st2reactor/st2reactor/rules/config.py index 50d2e7237f..8900b8d323 100644 --- a/st2reactor/st2reactor/rules/config.py +++ b/st2reactor/st2reactor/rules/config.py @@ -33,6 +33,12 @@ def _register_rules_engine_opts(): ] CONF.register_opts(logging_opts, group='rulesengine') + timer_opts = [ + cfg.StrOpt('local_timezone', default='America/Los_Angeles', + help='Timezone pertaining to the location where st2 is run.') + ] + CONF.register_opts(timer_opts, group='timer') + def register_opts(): _register_common_opts() diff --git a/st2reactor/st2reactor/contrib/__init__.py b/st2reactor/st2reactor/timer/__init__.py similarity index 100% rename from st2reactor/st2reactor/contrib/__init__.py rename to st2reactor/st2reactor/timer/__init__.py diff --git a/st2reactor/st2reactor/timer/base.py b/st2reactor/st2reactor/timer/base.py new file mode 100644 index 0000000000..fb53c7eced --- /dev/null +++ b/st2reactor/st2reactor/timer/base.py @@ -0,0 +1,161 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from apscheduler.schedulers.background import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger +import apscheduler.util as aps_utils +from dateutil.tz import tzutc +import dateutil.parser as date_parser +import jsonschema + +from st2common import log as logging +from st2common.constants.triggers import TIMER_TRIGGER_TYPES +from st2common.services.triggerwatcher import TriggerWatcher +from st2common.transport.reactor import TriggerDispatcher +import st2reactor.container.utils as container_utils + +LOG = logging.getLogger(__name__) + + +class St2Timer(object): + """ + A timer interface that uses APScheduler 3.0. + """ + def __init__(self, local_timezone=None): + self._timezone = local_timezone + self._scheduler = BlockingScheduler(timezone=self._timezone) + self._jobs = {} + self._trigger_types = TIMER_TRIGGER_TYPES.keys() + self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger, + update_handler=self._handle_update_trigger, + delete_handler=self._handle_delete_trigger, + trigger_types=self._trigger_types, + queue_suffix='timers') + self._trigger_dispatcher = TriggerDispatcher(LOG) + + def start(self): + self._register_timer_trigger_types() + self._scheduler.start() + + def cleanup(self): + self._scheduler.shutdown(wait=True) + + def add_trigger(self, trigger): + self._add_job_to_scheduler(trigger) + + def update_trigger(self, trigger): + self.remove_trigger(trigger) + self.add_trigger(trigger) + + def remove_trigger(self, trigger): + id = trigger['id'] + + try: + job_id = self._jobs[id] + except KeyError: + LOG.info('Job not found: %s', id) + return + + self._scheduler.remove_job(job_id) + + def _add_job_to_scheduler(self, trigger): + trigger_type_ref = trigger['type'] + trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref] + try: + jsonschema.validate(trigger['parameters'], + trigger_type['parameters_schema']) + except jsonschema.ValidationError as e: + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) + raise # Or should we just return? + + time_spec = trigger['parameters'] + time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone')) + + time_type = None + + if trigger_type['name'] == 'st2.IntervalTimer': + unit = time_spec.get('unit', None) + value = time_spec.get('delta', None) + time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone}) + elif trigger_type['name'] == 'st2.DateTimer': + # Raises an exception if date string isn't a valid one. + dat = date_parser.parse(time_spec.get('date', None)) + time_type = DateTrigger(dat, timezone=time_zone) + elif trigger_type['name'] == 'st2.CronTimer': + cron = time_spec.copy() + cron['timezone'] = time_zone + + time_type = CronTrigger(**cron) + + if hasattr(time_type, 'run_date') and datetime.now(tzutc()) > time_type.run_date: + LOG.warning('Not scheduling expired timer: %s : %s', + trigger['parameters'], time_type.run_date) + else: + self._add_job(trigger, time_type) + + def _add_job(self, trigger, time_type, replace=True): + try: + job = self._scheduler.add_job(self._emit_trigger_instance, + trigger=time_type, + args=[trigger], + replace_existing=replace) + LOG.info('Job %s scheduled.', job.id) + self._jobs[trigger['id']] = job.id + except Exception as e: + LOG.error('Exception scheduling timer: %s, %s', + trigger['parameters'], e, exc_info=True) + + def _emit_trigger_instance(self, trigger): + LOG.info('Timer fired at: %s. Trigger: %s', str(datetime.utcnow()), trigger) + + payload = { + 'executed_at': str(datetime.utcnow()), + 'schedule': trigger['parameters'].get('time') + } + self._trigger_dispatcher.dispatch(trigger, payload) + + def _register_timer_trigger_types(self): + return container_utils.add_trigger_models(TIMER_TRIGGER_TYPES.values()) + + ############################################## + # Event handler methods for the trigger events + ############################################## + + def _handle_create_trigger(self, trigger): + LOG.debug('Calling "add_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.add_trigger(trigger=trigger) + + def _handle_update_trigger(self, trigger): + LOG.debug('Calling "update_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.update_trigger(trigger=trigger) + + def _handle_delete_trigger(self, trigger): + LOG.debug('Calling "remove_trigger" method (trigger.type=%s)' % (trigger.type)) + trigger = self._sanitize_trigger(trigger=trigger) + self.remove_trigger(trigger=trigger) + + def _sanitize_trigger(self, trigger): + sanitized = trigger._data + if 'id' in sanitized: + # Friendly objectid rather than the MongoEngine representation. + sanitized['id'] = str(sanitized['id']) + return sanitized diff --git a/st2reactor/tests/unit/test_container_utils.py b/st2reactor/tests/unit/test_container_utils.py index c1a8e3cc47..7fbfbdb64f 100644 --- a/st2reactor/tests/unit/test_container_utils.py +++ b/st2reactor/tests/unit/test_container_utils.py @@ -15,18 +15,18 @@ import mock -from st2common.persistence.reactor import Trigger, TriggerType -from st2common.models.db.reactor import TriggerDB, TriggerTypeDB +from st2common.persistence.reactor import TriggerType +from st2common.models.db.reactor import TriggerDB from st2common.transport.publishers import PoolPublisher import st2reactor.container.utils as container_utils from st2tests.base import DbTestCase -MOCK_TRIGGER_TYPE = TriggerTypeDB() -MOCK_TRIGGER_TYPE.id = 'trigger-type-test.id' -MOCK_TRIGGER_TYPE.name = 'trigger-type-test.name' -MOCK_TRIGGER_TYPE.pack = 'dummy_pack_1' -MOCK_TRIGGER_TYPE.parameters_schema = {} -MOCK_TRIGGER_TYPE.payload_info = {} +MOCK_TRIGGER_TYPE = {} +MOCK_TRIGGER_TYPE['id'] = 'trigger-type-test.id' +MOCK_TRIGGER_TYPE['name'] = 'trigger-type-test.name' +MOCK_TRIGGER_TYPE['pack'] = 'dummy_pack_1' +MOCK_TRIGGER_TYPE['parameters_schema'] = {} +MOCK_TRIGGER_TYPE['payload_schema'] = {} MOCK_TRIGGER = TriggerDB() MOCK_TRIGGER.id = 'trigger-test.id' @@ -38,16 +38,6 @@ @mock.patch.object(PoolPublisher, 'publish', mock.MagicMock()) class ContainerUtilsTest(DbTestCase): - @mock.patch.object(TriggerType, 'query', mock.MagicMock( - return_value=[MOCK_TRIGGER_TYPE])) - @mock.patch.object(Trigger, 'get_by_name', mock.MagicMock(return_value=MOCK_TRIGGER)) - @mock.patch.object(TriggerType, 'add_or_update') - def test_add_trigger(self, mock_add_handler): - mock_add_handler.return_value = MOCK_TRIGGER_TYPE - container_utils.add_trigger_models(pack='dummy_pack_1', - trigger_types=[MOCK_TRIGGER_TYPE]) - self.assertTrue(mock_add_handler.called, 'trigger not added.') - def test_add_trigger_type(self): """ This sensor has misconfigured trigger type. We shouldn't explode. @@ -89,12 +79,11 @@ def test_add_trigger_type_no_params(self): 'parameters_schema': {}, 'payload_schema': {} } - trigtype_dbs = container_utils.add_trigger_models(pack='my_pack_1', - trigger_types=[trig_type]) + trigtype_dbs = container_utils.add_trigger_models(trigger_types=[trig_type]) trigger_type, trigger = trigtype_dbs[0] trigtype_db = TriggerType.get_by_id(trigger_type.id) - self.assertEqual(trigtype_db.pack, 'my_pack_1') + self.assertEqual(trigtype_db.pack, 'dummy_pack_1') self.assertEqual(trigtype_db.name, trig_type.get('name')) self.assertTrue(trigger is not None) self.assertEqual(trigger.name, trigtype_db.name) @@ -112,13 +101,12 @@ def test_add_trigger_type_with_params(self): } trig_type = { 'name': 'myawesometriggertype2', - 'pack': 'dummy_pack_1', + 'pack': 'my_pack_1', 'description': 'Words cannot describe how awesome I am.', 'parameters_schema': PARAMETERS_SCHEMA, 'payload_schema': {} } - trigtype_dbs = container_utils.add_trigger_models(pack='my_pack_1', - trigger_types=[trig_type]) + trigtype_dbs = container_utils.add_trigger_models(trigger_types=[trig_type]) trigger_type, trigger = trigtype_dbs[0] trigtype_db = TriggerType.get_by_id(trigger_type.id)