Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Added

NOTE: Those options are only supported when using a default and officially supported AMQP backend
with RabbitMQ server. (new feature) #4541
* Add metrics instrumentation to the ``st2notifier`` service. For the available / exposed metrics,
please refer to https://docs.stackstorm.com/reference/metrics.html. (improvement) #4536

Changed
~~~~~~~
Expand Down
39 changes: 27 additions & 12 deletions st2actions/st2actions/notifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
from st2common.services.keyvalues import KeyValueLookup
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE
from st2common.metrics.base import CounterWithTimer
from st2common.metrics.base import Timer

__all__ = [
'Notifier',
Expand Down Expand Up @@ -73,6 +75,7 @@ def __init__(self, connection, queues, trigger_dispatcher=None):
pack=ACTION_TRIGGER_TYPE['pack'],
name=ACTION_TRIGGER_TYPE['name'])

@CounterWithTimer(key='notifier.action.executions')
def process(self, execution_db):
execution_id = str(execution_db.id)
extra = {'execution': execution_db}
Expand All @@ -86,12 +89,18 @@ def process(self, execution_db):
# action execution will be applied by the workflow engine. A policy may affect the
# final state of the action execution thereby impacting the state of the workflow.
if not workflow_service.is_action_execution_under_workflow_context(execution_db):
policy_service.apply_post_run_policies(liveaction_db)
with CounterWithTimer(key='notifier.apply_post_run_policies'):
policy_service.apply_post_run_policies(liveaction_db)

if liveaction_db.notify is not None:
self._post_notify_triggers(liveaction_db=liveaction_db, execution_db=execution_db)
if liveaction_db.notify:
with CounterWithTimer(key='notifier.notify_trigger.post'):
self._post_notify_triggers(liveaction_db=liveaction_db,
execution_db=execution_db)

self._post_generic_trigger(liveaction_db=liveaction_db, execution_db=execution_db)
if cfg.CONF.action_sensor.enable:
with CounterWithTimer(key='notifier.generic_trigger.post'):
self._post_generic_trigger(liveaction_db=liveaction_db,
execution_db=execution_db)

def _get_execution_for_liveaction(self, liveaction):
execution = ActionExecution.get(liveaction__id=str(liveaction.id))
Expand Down Expand Up @@ -127,7 +136,7 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
notify_subsection=None,
default_message_suffix=None):
routes = (getattr(notify_subsection, 'routes') or
getattr(notify_subsection, 'channels', None))
getattr(notify_subsection, 'channels', [])) or []

execution_id = str(execution_db.id)

Expand All @@ -142,13 +151,15 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
)

try:
message = self._transform_message(message=message,
context=jinja_context)
with Timer(key='notifier.transform_message'):
message = self._transform_message(message=message,
context=jinja_context)
except:
LOG.exception('Failed (Jinja) transforming `message`.')

try:
data = self._transform_data(data=data, context=jinja_context)
with Timer(key='notifier.transform_data'):
data = self._transform_data(data=data, context=jinja_context)
except:
LOG.exception('Failed (Jinja) transforming `data`.')

Expand Down Expand Up @@ -187,8 +198,10 @@ def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None
payload['channel'] = route
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
liveaction_db.id, payload)
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
trace_context=trace_context)

with CounterWithTimer(key='notifier.notify_trigger.dispatch'):
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
trace_context=trace_context)
except:
failed_routes.append(route)

Expand Down Expand Up @@ -254,8 +267,10 @@ def _post_generic_trigger(self, liveaction_db=None, execution_db=None):
trace_context = self._get_trace_context(execution_id=execution_id)
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
ACTION_TRIGGER_TYPE['name'], liveaction_db.id, payload, trace_context)
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
trace_context=trace_context)

with CounterWithTimer(key='notifier.generic_trigger.dispatch'):
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
trace_context=trace_context)

def _get_runner_ref(self, action_ref):
"""
Expand Down