Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions st2api/st2api/controllers/v1/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from urlparse import urljoin

from st2common import log as logging
from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF
from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES
from st2common.models.base import jsexpose
import st2common.services.triggers as trigger_service
from st2common.services.triggerwatcher import TriggerWatcher
from st2common.transport.reactor import TriggerDispatcher

http_client = six.moves.http_client

LOG = logging.getLogger(__name__)
Expand All @@ -39,7 +39,7 @@ def __init__(self, *args, **kwargs):
super(WebhooksController, self).__init__(*args, **kwargs)
self._hooks = {}
self._base_url = '/webhooks/'
self._trigger_types = [GENERIC_WEBHOOK_TRIGGER_REF]
self._trigger_types = WEBHOOK_TRIGGER_TYPES.keys()

self._trigger_dispatcher = TriggerDispatcher(LOG)
self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger,
Expand All @@ -48,6 +48,7 @@ def __init__(self, *args, **kwargs):
trigger_types=self._trigger_types,
queue_suffix='webhooks')
self._trigger_watcher.start()
self._register_webhook_trigger_types()

@jsexpose()
def get_all(self):
Expand Down Expand Up @@ -99,6 +100,10 @@ def _is_valid_hook(self, hook):
def _get_trigger_for_hook(self, hook):
return self._hooks[hook]

def _register_webhook_trigger_types(self):
for trigger_type in WEBHOOK_TRIGGER_TYPES.values():
trigger_service.create_trigger_type_db(trigger_type)

def add_trigger(self, trigger):
url = trigger['parameters']['url']
LOG.info('Listening to endpoint: %s', urljoin(self._base_url, url))
Expand Down
4 changes: 2 additions & 2 deletions st2api/tests/unit/controllers/v1/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from tests import FunctionalTest

from st2api.controllers.v1.webhooks import WebhooksController
from st2common.constants.triggers import GENERIC_WEBHOOK_TRIGGER_REF
from st2common.constants.triggers import WEBHOOK_TRIGGER_TYPES
from st2common.models.db.reactor import TriggerDB
from st2common.transport.reactor import TriggerInstancePublisher

Expand All @@ -37,7 +37,7 @@
}

DUMMY_TRIGGER = TriggerDB(name='pr-merged', pack='git')
DUMMY_TRIGGER.type = GENERIC_WEBHOOK_TRIGGER_REF
DUMMY_TRIGGER.type = WEBHOOK_TRIGGER_TYPES.keys()[0]


class TestTriggerTypeController(FunctionalTest):
Expand Down
166 changes: 165 additions & 1 deletion st2common/st2common/constants/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,168 @@
# See the License for the specific language governing permissions and
# limitations under the License.

GENERIC_WEBHOOK_TRIGGER_REF = 'core.st2.webhook'
from st2common.constants.pack import SYSTEM_PACK_NAME
from st2common.models.system.common import ResourceReference


WEBHOOKS_PARAMETERS_SCHEMA = {
'type': 'object',
'properties': {
'url': {
'type': 'string'
}
},
'required': [
'url'
],
'additionalProperties': False
}


WEBHOOKS_PAYLOAD_SCHEMA = {
'type': 'object'
}

WEBHOOK_TRIGGER_TYPES = {
ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.webhook'): {
'name': 'st2.webhook',
'pack': SYSTEM_PACK_NAME,
'description': 'Trigger type for registering webhooks that can consume'
+ ' arbitrary payload.',
'parameters_schema': WEBHOOKS_PARAMETERS_SCHEMA,
'payload_schema': WEBHOOKS_PAYLOAD_SCHEMA
}
}

# Timer specs

INTERVAL_PARAMETERS_SCHEMA = {
"type": "object",
"properties": {
"timezone": {
"type": "string"
},
"unit": {
"enum": ["weeks", "days", "hours", "minutes", "seconds"]
},
"delta": {
"type": "integer"
}
},
"required": [
"unit",
"delta"
],
"additionalProperties": False
}

DATE_PARAMETERS_SCHEMA = {
"type": "object",
"properties": {
"timezone": {
"type": "string"
},
"date": {
"type": "string",
"format": "date-time"
}
},
"required": [
"date"
],
"additionalProperties": False
}

CRON_PARAMETERS_SCHEMA = {
"type": "object",
"properties": {
"timezone": {
"type": "string"
},
"year": {
"type": "integer"
},
"month": {
"type": "integer",
"minimum": 1,
"maximum": 12
},
"day": {
"type": "integer",
"minimum": 1,
"maximum": 31
},
"week": {
"type": "integer",
"minimum": 1,
"maximum": 53
},
"day_of_week": {
"type": "integer",
"minimum": 0,
"maximum": 6
},
"hour": {
"type": "integer",
"minimum": 0,
"maximum": 23
},
"minute": {
"type": "integer",
"minimum": 0,
"maximum": 59
},
"second": {
"type": "integer",
"minimum": 0,
"maximum": 59
}
},
"additionalProperties": False
}

TIMER_PAYLOAD_SCHEMA = {
"type": "object",
"properties": {
"executed_at": {
"type": "string",
"format": "date-time",
"default": "2014-07-30 05:04:24.578325"
},
"schedule": {
"type": "object",
"default": {
"delta": 30,
"units": "seconds"
}
}
}
}

TIMER_TRIGGER_TYPES = {
ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.IntervalTimer'): {
'name': 'st2.IntervalTimer',
'pack': SYSTEM_PACK_NAME,
'description': 'Triggers on specified intervals. e.g. every 30s, 1week etc.',
'payload_schema': TIMER_PAYLOAD_SCHEMA,
'parameters_schema': INTERVAL_PARAMETERS_SCHEMA
},
ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.DateTimer'): {
'name': 'st2.DateTimer',
'pack': SYSTEM_PACK_NAME,
'description': 'Triggers exactly once when the current time matches the specified time. '
'e.g. timezone:UTC date:2014-12-31 23:59:59.',
'payload_schema': TIMER_PAYLOAD_SCHEMA,
'parameters_schema': DATE_PARAMETERS_SCHEMA
},
ResourceReference.to_string_reference(SYSTEM_PACK_NAME, 'st2.CronTimer'): {
'name': 'st2.CronTimer',
'pack': SYSTEM_PACK_NAME,
'description': 'Triggers whenever current time matches the specified time constaints like '
'a UNIX cron scheduler.',
'payload_schema': TIMER_PAYLOAD_SCHEMA,
'parameters_schema': CRON_PARAMETERS_SCHEMA
}
}

SYSTEM_TRIGGER_TYPES = dict(WEBHOOK_TRIGGER_TYPES.items() + TIMER_TRIGGER_TYPES.items())
45 changes: 40 additions & 5 deletions st2common/st2common/services/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.

from st2common import log as logging
from st2common.models.api.reactor import TriggerAPI
from st2common.models.api.reactor import (TriggerAPI, TriggerTypeAPI)
from st2common.models.system.common import ResourceReference
from st2common.persistence.reactor import Trigger
from st2common.persistence.reactor import (Trigger, TriggerType)

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,9 +44,7 @@ def get_trigger_db(trigger):
# TODO: This method should die in a fire
if isinstance(trigger, str) or isinstance(trigger, unicode):
# Assume reference was passed in
ref_obj = ResourceReference.from_string_reference(ref=trigger)
return _get_trigger_db_by_name_and_pack(name=ref_obj.name,
pack=ref_obj.pack)
return Trigger.get_by_ref(trigger)
if isinstance(trigger, dict):
name = trigger.get('name', None)
pack = trigger.get('pack', None)
Expand All @@ -73,6 +71,23 @@ def get_trigger_db(trigger):
raise Exception('Unrecognized object')


def get_trigger_type_db(ref):
"""
Returns the trigger type object from db given a string ref.

:param ref: Reference to the trigger type db object.
:type ref: ``str``

:rtype trigger_type: ``object``
"""
try:
return TriggerType.get_by_ref(ref)
except ValueError as e:
LOG.debug('Database lookup for ref="%s" resulted ' +
'in exception : %s.', ref, e, exc_info=True)
return None


def _get_trigger_api_given_rule(rule):
trigger = rule.trigger
triggertype_ref = ResourceReference.from_string_reference(trigger.get('type'))
Expand Down Expand Up @@ -103,3 +118,23 @@ def create_trigger_db(trigger):
def create_trigger_db_from_rule(rule):
trigger_api = _get_trigger_api_given_rule(rule)
return create_trigger_db(trigger_api)


def create_trigger_type_db(trigger_type):
"""
Creates a trigger type db object in the db given trigger_type definition as dict.

:param trigger_type: Trigger type model.
:type trigger_type: ``dict``

:rtype: ``object``
"""
trigger_type_api = TriggerTypeAPI(**trigger_type)
ref = ResourceReference.to_string_reference(name=trigger_type_api.name,
pack=trigger_type_api.pack)
trigger_type_db = get_trigger_type_db(ref)
if not trigger_type_db:
trigger_type_db = TriggerTypeAPI.to_model(trigger_type_api)
LOG.debug('verified trigger and formulated TriggerDB=%s', trigger_type_db)
trigger_type_db = TriggerType.add_or_update(trigger_type_db)
return trigger_type_db
9 changes: 1 addition & 8 deletions st2reactor/st2reactor/bootstrap/sensorsregistrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
LOG = logging.getLogger(__name__)

PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)))
SYSTEM_SENSORS_PATH = os.path.join(PATH, '../contrib/sensors')
SYSTEM_SENSORS_PATH = os.path.abspath(SYSTEM_SENSORS_PATH)


class SensorsRegistrar(object):
Expand Down Expand Up @@ -66,8 +64,7 @@ def _register_sensor_from_pack(self, pack, sensor):
poll_interval = metadata.get('poll_interval', None)

# Add TrigerType models to the DB
trigger_type_dbs = container_utils.add_trigger_models(pack=pack,
trigger_types=trigger_types)
trigger_type_dbs = container_utils.add_trigger_models(trigger_types=trigger_types)

# Populate a list of references belonging to this sensor
trigger_type_refs = []
Expand All @@ -94,10 +91,6 @@ def register_sensors_from_packs(self, base_dir):
pack_loader = ContentPackLoader()
dirs = pack_loader.get_content(base_dir=base_dir, content_type='sensors')

# Add system sensors to the core pack
dirs['core'] = {}
dirs['core'] = SYSTEM_SENSORS_PATH

for pack, sensors_dir in six.iteritems(dirs):
try:
LOG.info('Registering sensors from pack: %s', pack)
Expand Down
16 changes: 15 additions & 1 deletion st2reactor/st2reactor/cmd/rulesengine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys

import eventlet
from oslo.config import cfg

from st2common import log as logging
Expand All @@ -9,6 +10,7 @@
from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH
from st2reactor.rules import config
from st2reactor.rules import worker
from st2reactor.timer.base import St2Timer

LOG = logging.getLogger('st2reactor.bin.rulesengine')

Expand Down Expand Up @@ -36,14 +38,26 @@ def _teardown():
db_teardown()


def _kickoff_rules_worker(worker):
worker.work()


def _kickoff_timer(timer):
timer.start()


def main():
timer = St2Timer(local_timezone=cfg.CONF.timer.local_timezone)
try:
_setup()
return worker.work()
timer_thread = eventlet.spawn(_kickoff_timer, timer)
worker_thread = eventlet.spawn(_kickoff_rules_worker, worker)
return (timer_thread.wait() and worker_thread.wait())
except SystemExit as exit_code:
sys.exit(exit_code)
except:
LOG.exception('(PID:%s) RulesEngine quit due to exception.', os.getpid())
return 1
finally:
timer.cleanup()
_teardown()
Loading