diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a6107c7374..7e5734a415 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -83,6 +83,12 @@ Changed ``st2rulesengine`` service. This would make such issues very hard to troubleshoot because only way to find out about this failure would be to inspect the ``st2rulesengine`` service logs. (improvement) #4231 +* Improve code metric instrumentation and instrument code and various services with more metrics. + (improvement) #4310 +* Add new ``metrics.prefix`` config option. With this option user can specify an optional prefix + which is prepended to each metric key (name). This comes handy in scenarios where user wants to + submit metrics from multiple environments / deployments (e.g. testing, staging, dev) to the same + backend instance. (improvement) #4310 Fixed ~~~~~ diff --git a/conf/metrics/carbon/storage-aggregation.conf b/conf/metrics/carbon/storage-aggregation.conf new file mode 100644 index 0000000000..08ad358d20 --- /dev/null +++ b/conf/metrics/carbon/storage-aggregation.conf @@ -0,0 +1,52 @@ +# Aggregation methods for whisper files. Entries are scanned in order, +# and first match wins. This file is scanned for changes every 60 seconds +# +# [name] +# pattern = +# xFilesFactor = +# aggregationMethod = +# +# name: Arbitrary unique name for the rule +# pattern: Regex pattern to match against the metric name +# xFilesFactor: Ratio of valid data points required for aggregation to the next retention to occur +# aggregationMethod: function to apply to data points for aggregation +# +[min] +pattern = \.min$ +xFilesFactor = 0.1 +aggregationMethod = min + +[max] +pattern = \.max$ +xFilesFactor = 0.1 +aggregationMethod = max + +[count] +pattern = \.count$ +xFilesFactor = 0 +aggregationMethod = sum + +[count_legacy] +pattern = ^stats_counts.* +xFilesFactor = 0 +aggregationMethod = sum + +[lower] +pattern = \.lower(_\d+)?$ +xFilesFactor = 0.1 +aggregationMethod = min + +[upper] +pattern = \.upper(_\d+)?$ +xFilesFactor = 0.1 +aggregationMethod = max + +[sum] +pattern = \.sum$ +xFilesFactor = 0 +aggregationMethod = sum + +[default_average] +pattern = .* +xFilesFactor = 0.5 +aggregationMethod = average diff --git a/conf/metrics/carbon/storage-schemas.conf b/conf/metrics/carbon/storage-schemas.conf new file mode 100644 index 0000000000..aa7fccde9c --- /dev/null +++ b/conf/metrics/carbon/storage-schemas.conf @@ -0,0 +1,20 @@ +# Schema definitions for Whisper files. Entries are scanned in order, +# and first match wins. This file is scanned for changes every 60 seconds. +# +# [name] +# pattern = regex +# retentions = timePerPoint:timeToStore, timePerPoint:timeToStore, ... + +# Carbon's internal metrics. This entry should match what is specified in +# CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings +[stats] +pattern = ^stats.* +retentions = 10s:1d,1m:7d,10m:1y + +[carbon] +pattern = ^carbon\. +retentions = 60:90d + +[default_1min_for_1day] +pattern = .* +retentions = 60s:1d diff --git a/conf/metrics/statsd/localConfig.js b/conf/metrics/statsd/localConfig.js new file mode 100644 index 0000000000..44cb9e03ec --- /dev/null +++ b/conf/metrics/statsd/localConfig.js @@ -0,0 +1,19 @@ +// Sample statsd config for usage with metrics instrumentation +{ + // IP and port of a local or remote graphite instance to which statsd will + // submit metrics + graphiteHost: "127.0.0.1", + graphitePort: 2003, + + // statsd listen IP and port + address: "0.0.0.0", + port: 8125, + + // Enable debug mode for easier debugging, disable in production + debug: true, + + // Disable legacy name prefix + graphite: { + legacyNamespace: false + } +} diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 413ce14106..55ad4e3680 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -183,6 +183,8 @@ cluster_urls = # comma separated list allowed here. [metrics] # Destination server to connect to if driver requires connection. host = 127.0.0.1 +# Optional prefix which is prepended to all the metric names. Comes handy when you want to submit metrics from various environment to the same metric backend instance. +prefix = None # Driver type for metrics collection. driver = noop # Destination port to connect to if driver requires connection. diff --git a/conf/st2.dev.conf b/conf/st2.dev.conf index e98322121e..21c5afc984 100644 --- a/conf/st2.dev.conf +++ b/conf/st2.dev.conf @@ -119,6 +119,6 @@ jitter_interval = 0 enable_common_libs = True [metrics] -driver = noop +driver = echo host = 127.0.0.1 port = 8125 diff --git a/st2actions/st2actions/container/base.py b/st2actions/st2actions/container/base.py index 9559dc0553..ee231a1a65 100644 --- a/st2actions/st2actions/container/base.py +++ b/st2actions/st2actions/container/base.py @@ -33,7 +33,7 @@ from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id) from st2common.util import param as param_utils from st2common.util.config_loader import ContentPackConfigLoader -from st2common.metrics.base import CounterWithTimer, format_metrics_key +from st2common.metrics.base import CounterWithTimer from st2common.util import jsonify from st2common.runners.base import get_runner @@ -82,7 +82,7 @@ def dispatch(self, liveaction_db): 'in an unsupported status of "%s".' % liveaction_db.status ) - with CounterWithTimer(key="st2.action.executions"): + with CounterWithTimer(key="action.executions"): liveaction_db = funcs[liveaction_db.status](runner) return liveaction_db.result @@ -122,9 +122,10 @@ def _do_run(self, runner): extra = {'runner': runner, 'parameters': resolved_action_params} LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra) - with CounterWithTimer(key=format_metrics_key(action_db=runner.action, key='action')): - (status, result, context) = runner.run(action_params) - result = jsonify.try_loads(result) + with CounterWithTimer(key='action.executions'): + with CounterWithTimer(key='action.%s.executions' % (runner.action.ref)): + (status, result, context) = runner.run(action_params) + result = jsonify.try_loads(result) action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES diff --git a/st2api/st2api/app.py b/st2api/st2api/app.py index 5fd756ca53..4526e0212d 100644 --- a/st2api/st2api/app.py +++ b/st2api/st2api/app.py @@ -22,6 +22,8 @@ from st2common.middleware.cors import CorsMiddleware from st2common.middleware.request_id import RequestIDMiddleware from st2common.middleware.logging import LoggingMiddleware +from st2common.middleware.instrumentation import RequestInstrumentationMiddleware +from st2common.middleware.instrumentation import ResponseInstrumentationMiddleware from st2common.router import Router from st2common.util.monkey_patch import monkey_patch from st2common.constants.system import VERSION_STRING @@ -75,6 +77,8 @@ def setup_app(config={}): app = ErrorHandlingMiddleware(app) app = CorsMiddleware(app) app = LoggingMiddleware(app, router) + app = ResponseInstrumentationMiddleware(app, service_name='api') app = RequestIDMiddleware(app) + app = RequestInstrumentationMiddleware(app, service_name='api') return app diff --git a/st2auth/st2auth/app.py b/st2auth/st2auth/app.py index e7050ffb6f..2bbf725ab5 100644 --- a/st2auth/st2auth/app.py +++ b/st2auth/st2auth/app.py @@ -20,6 +20,8 @@ from st2common.middleware.cors import CorsMiddleware from st2common.middleware.request_id import RequestIDMiddleware from st2common.middleware.logging import LoggingMiddleware +from st2common.middleware.instrumentation import RequestInstrumentationMiddleware +from st2common.middleware.instrumentation import ResponseInstrumentationMiddleware from st2common.router import Router from st2common.util.monkey_patch import monkey_patch from st2common.constants.system import VERSION_STRING @@ -69,6 +71,8 @@ def setup_app(config={}): app = ErrorHandlingMiddleware(app) app = CorsMiddleware(app) app = LoggingMiddleware(app, router) + app = ResponseInstrumentationMiddleware(app, service_name='auth') app = RequestIDMiddleware(app) + app = RequestInstrumentationMiddleware(app, service_name='auth') return app diff --git a/st2common/setup.py b/st2common/setup.py index 283fa5ddb9..473ed83c8a 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -68,6 +68,7 @@ 'st2common.metrics.driver': [ 'statsd = st2common.metrics.drivers.statsd_driver:StatsdDriver', 'noop = st2common.metrics.drivers.noop_driver:NoopDriver', + 'echo = st2common.metrics.drivers.echo_driver:EchoDriver', ], } ) diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index d712c313bf..347e6145ad 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -538,6 +538,11 @@ def register_opts(ignore_errors=False): cfg.IntOpt( 'port', default=8125, help='Destination port to connect to if driver requires connection.'), + cfg.StrOpt( + 'prefix', default=None, + help='Optional prefix which is prepended to all the metric names. Comes handy when ' + 'you want to submit metrics from various environment to the same metric ' + 'backend instance.') ] do_register_opts(metrics_opts, group='metrics', ignore_errors=ignore_errors) diff --git a/st2common/st2common/metrics/base.py b/st2common/st2common/metrics/base.py index 66f932d095..5c822aa351 100644 --- a/st2common/st2common/metrics/base.py +++ b/st2common/st2common/metrics/base.py @@ -12,20 +12,32 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import absolute_import + from functools import wraps import logging -from six import string_types from oslo_config import cfg from oslo_config.cfg import NoSuchOptError from stevedore.exception import NoMatches, MultipleMatches -from st2common.constants.metrics import METRICS_COUNTER_SUFFIX, METRICS_TIMER_SUFFIX +from st2common.metrics.utils import check_key from st2common.util.loader import get_plugin_instance from st2common.util.date import get_datetime_utc_now from st2common.exceptions.plugins import PluginLoadError +__all__ = [ + 'BaseMetricsDriver', + + 'Timer', + 'Counter', + 'CounterWithTimer', + + 'metrics_initialize', + 'get_driver' +] + if not hasattr(cfg.CONF, 'metrics'): from st2common.config import register_opts register_opts() @@ -33,95 +45,71 @@ LOG = logging.getLogger(__name__) PLUGIN_NAMESPACE = 'st2common.metrics.driver' -METRICS = None - - -def _strip_pack(action, pack): - formatted_pack = "%s." % (pack) - - if formatted_pack in action: - return action.replace(formatted_pack, '') - - return action - - -def _format_metrics_key_for_action_db(action_db): - action_pack = action_db.pack if action_db.pack else 'unknown' - action_name = _strip_pack(action_db.name, action_pack) - return [action_pack, action_name] - - -def _format_metrics_key_for_liveaction_db(liveaction_db): - action_pack = liveaction_db.context.get('pack', 'unknown') - action_name = _strip_pack(liveaction_db.action, action_pack) - return [action_pack, action_name] - -def format_metrics_key(action_db=None, liveaction_db=None, key=None): - """Return a string for usage as metrics key. - """ - assert (action_db or key or liveaction_db), """Must supply one of key, action_db, or - liveaction_db""" - metrics_key_items = ['st2'] - - if action_db: - metrics_key_items.extend(_format_metrics_key_for_action_db(action_db)) - - if liveaction_db: - metrics_key_items.extend( - _format_metrics_key_for_liveaction_db(liveaction_db) - ) - - if key: - metrics_key_items.append('%s' % key) - - metrics_key = '.'.join(metrics_key_items) - - LOG.debug("Generated Metrics Key: %s", metrics_key) - - return metrics_key +# Stores reference to the metrics driver class instance. +# NOTE: This value is populated lazily on the first get_driver() function call +METRICS = None class BaseMetricsDriver(object): - """ Base class for driver implementations for metric collection """ + Base class for driver implementations for metric collection + """ + def time(self, key, time): - """ Timer metric + """ + Timer metric """ pass def inc_counter(self, key, amount=1): - """ Increment counter + """ + Increment counter """ pass def dec_counter(self, key, amount=1): - """ Decrement metric + """ + Decrement metric """ pass + def set_gauge(self, key, value): + """ + Set gauge value. + """ + pass -def check_key(key): - """Ensure key meets requirements. - """ - assert isinstance(key, string_types), "Key not a string. Got %s" % type(key) - assert key, "Key cannot be empty string." + def inc_gauge(self, key, amount=1): + """ + Increment gauge value. + """ + pass + + def dec_gauge(self, key, amount=1): + """ + Decrement gauge value. + """ + pass class Timer(object): - """ Timer context manager for easily sending timer statistics. + """ + Timer context manager for easily sending timer statistics. """ def __init__(self, key, include_parameter=False): check_key(key) + self.key = key self._metrics = get_driver() self._include_parameter = include_parameter self._start_time = None def send_time(self, key=None): - """ Send current time from start time. """ - time_delta = get_datetime_utc_now() - self._start_time + Send current time from start time. + """ + time_delta = self.get_time_delta() if key: check_key(key) @@ -130,8 +118,10 @@ def send_time(self, key=None): self._metrics.time(self.key, time_delta.total_seconds()) def get_time_delta(self): - """ Get current time delta. """ + Get current time delta. + """ + return get_datetime_utc_now() - self._start_time def __enter__(self): @@ -152,7 +142,8 @@ def wrapper(*args, **kw): class Counter(object): - """ Timer context manager for easily sending timer statistics. + """ + Counter context manager for easily sending counter statistics. """ def __init__(self, key): check_key(key) @@ -164,7 +155,7 @@ def __enter__(self): return self def __exit__(self, *args): - self._metrics.dec_counter(self.key) + pass def __call__(self, func): @wraps(func) @@ -175,9 +166,11 @@ def wrapper(*args, **kw): class CounterWithTimer(object): - """ Timer and counter context manager for easily sending timer statistics + """ + Timer and counter context manager for easily sending counter statistics with builtin timer. """ + def __init__(self, key, include_parameter=False): check_key(key) self.key = key @@ -186,30 +179,30 @@ def __init__(self, key, include_parameter=False): self._start_time = None def send_time(self, key=None): - """ Send current time from start time. """ - time_delta = get_datetime_utc_now() - self._start_time + Send current time from start time. + """ + time_delta = self.get_time_delta() if key: check_key(key) self._metrics.time(key, time_delta.total_seconds()) else: - self._metrics.time("%s%s" % (self.key, METRICS_TIMER_SUFFIX), - time_delta.total_seconds()) + self._metrics.time(self.key, time_delta.total_seconds()) def get_time_delta(self): - """ Get current time delta. + """ + Get current time delta. """ return get_datetime_utc_now() - self._start_time def __enter__(self): - self._metrics.inc_counter("%s%s" % (self.key, METRICS_COUNTER_SUFFIX)) + self._metrics.inc_counter(self.key) self._start_time = get_datetime_utc_now() return self def __exit__(self, *args): self.send_time() - self._metrics.dec_counter("%s_counter" % (self.key)) def __call__(self, func): @wraps(func) @@ -222,9 +215,11 @@ def wrapper(*args, **kw): def metrics_initialize(): - """Initialize metrics constant + """ + Initialize metrics constant """ global METRICS + try: METRICS = get_plugin_instance(PLUGIN_NAMESPACE, cfg.CONF.metrics.driver) except (NoMatches, MultipleMatches, NoSuchOptError) as error: @@ -234,7 +229,8 @@ def metrics_initialize(): def get_driver(): - """Return metrics driver instance + """ + Return metrics driver instance """ if not METRICS: return metrics_initialize() diff --git a/st2common/st2common/metrics/drivers/prometheus_driver.py b/st2common/st2common/metrics/drivers/echo_driver.py similarity index 55% rename from st2common/st2common/metrics/drivers/prometheus_driver.py rename to st2common/st2common/metrics/drivers/echo_driver.py index 8a85ef8c20..55878e9b9a 100644 --- a/st2common/st2common/metrics/drivers/prometheus_driver.py +++ b/st2common/st2common/metrics/drivers/echo_driver.py @@ -12,37 +12,36 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from prometheus_client import Histogram, Gauge +from st2common import log as logging from st2common.metrics.base import BaseMetricsDriver +__all__ = [ + 'EchoDriver' +] -class PrometheusDriver(BaseMetricsDriver): - """ Base class for driver implementations for metric collection +LOG = logging.getLogger(__name__) + + +class EchoDriver(BaseMetricsDriver): + """ + Driver which logs / LOG.debugs out each metrics operation which would have been performed. """ - def __init__(self): - pass def time(self, key, time): - """ Timer metric - """ - prometheus_histogram = Histogram( # pylint: disable=no-value-for-parameter - key - ) - prometheus_histogram.observe(time) + LOG.debug('[metrics] time(key=%s, time=%s)' % (key, time)) def inc_counter(self, key, amount=1): - """ Increment counter - """ - prometheus_counter = Gauge( # pylint: disable=no-value-for-parameter - key - ) - prometheus_counter.inc(amount) + LOG.debug('[metrics] counter.incr(%s, %s)' % (key, amount)) def dec_counter(self, key, amount=1): - """ Decrement metric - """ - prometheus_counter = Gauge( # pylint: disable=no-value-for-parameter - key - ) - prometheus_counter.dec(amount) + LOG.debug('[metrics] counter.decr(%s, %s)' % (key, amount)) + + def set_gauge(self, key, value): + LOG.debug('[metrics] set_gauge(%s, %s)' % (key, value)) + + def inc_gauge(self, key, amount=1): + LOG.debug('[metrics] gauge.incr(%s, %s)' % (key, amount)) + + def dec_gauge(self, key, amount=1): + LOG.debug('[metrics] gauge.decr(%s, %s)' % (key, amount)) diff --git a/st2common/st2common/metrics/drivers/noop_driver.py b/st2common/st2common/metrics/drivers/noop_driver.py index 08ca257bb5..32c52277b4 100644 --- a/st2common/st2common/metrics/drivers/noop_driver.py +++ b/st2common/st2common/metrics/drivers/noop_driver.py @@ -12,11 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from st2common.metrics.base import BaseMetricsDriver +__all__ = [ + 'NoopDriver' +] + class NoopDriver(BaseMetricsDriver): - """ Dummy implementation of BaseMetricsDriver """ + Dummy implementation of BaseMetricsDriver + """ + def __init__(self, *_args, **_kwargs): pass diff --git a/st2common/st2common/metrics/drivers/statsd_driver.py b/st2common/st2common/metrics/drivers/statsd_driver.py index 7646c1d2dd..a4bce08be0 100644 --- a/st2common/st2common/metrics/drivers/statsd_driver.py +++ b/st2common/st2common/metrics/drivers/statsd_driver.py @@ -12,40 +12,92 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from numbers import Number -from oslo_config import cfg + import statsd +from oslo_config import cfg -from st2common.metrics.base import BaseMetricsDriver, check_key +from st2common.metrics.base import BaseMetricsDriver +from st2common.metrics.utils import check_key +from st2common.metrics.utils import get_full_key_name + +__all__ = [ + 'StatsdDriver' +] class StatsdDriver(BaseMetricsDriver): - """ StatsD Implementation of the metrics driver + """ + StatsD Implementation of the metrics driver """ def __init__(self): statsd.Connection.set_defaults(host=cfg.CONF.metrics.host, port=cfg.CONF.metrics.port) + self._counters = {} self._timer = statsd.Timer('') def time(self, key, time): - """ Timer metric + """ + Timer metric """ check_key(key) assert isinstance(time, Number) + + key = get_full_key_name(key) self._timer.send(key, time) def inc_counter(self, key, amount=1): - """ Increment counter + """ + Increment counter """ check_key(key) assert isinstance(amount, Number) + + key = get_full_key_name(key) self._counters[key] = self._counters.get(key, statsd.Counter(key)) self._counters[key] += amount def dec_counter(self, key, amount=1): - """ Decrement metric + """ + Decrement metric """ check_key(key) assert isinstance(amount, Number) + + key = get_full_key_name(key) self._counters[key] = self._counters.get(key, statsd.Counter(key)) self._counters[key] -= amount + + def set_gauge(self, key, value): + """ + Set gauge value. + """ + check_key(key) + assert isinstance(value, Number) + + key = get_full_key_name(key) + gauge = statsd.Gauge(key) + gauge.send(None, value) + + def inc_gauge(self, key, amount=1): + """ + Increment gauge value. + """ + check_key(key) + assert isinstance(amount, Number) + + key = get_full_key_name(key) + gauge = statsd.Gauge(key) + gauge.increment(None, amount) + + def dec_gauge(self, key, amount=1): + """ + Decrement gauge value. + """ + check_key(key) + assert isinstance(amount, Number) + + key = get_full_key_name(key) + gauge = statsd.Gauge(key) + gauge.decrement(None, amount) diff --git a/st2common/st2common/constants/metrics.py b/st2common/st2common/metrics/utils.py similarity index 56% rename from st2common/st2common/constants/metrics.py rename to st2common/st2common/metrics/utils.py index 0604680d15..a790fc8d25 100644 --- a/st2common/st2common/constants/metrics.py +++ b/st2common/st2common/metrics/utils.py @@ -13,10 +13,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -METRICS_COUNTER_SUFFIX = "_counter" -METRICS_TIMER_SUFFIX = "_timer" +import six +from oslo_config import cfg -PYTHON_RUNNER_EXECUTION = "python_runner_execution" -PYTHON_WRAPPER_EXECUTION = "python_wrapper_execution" +__all__ = [ + 'get_full_key_name', + 'check_key' +] -METRICS_REGISTER_RUNNER = "register_runner" + +def get_full_key_name(key): + """ + Return full metric key name, taking into account optional prefix which can be specified in the + config. + """ + parts = ['st2'] + + if cfg.CONF.metrics.prefix: + parts.append(cfg.CONF.metrics.prefix) + + parts.append(key) + + return '.'.join(parts) + + +def check_key(key): + """ + Ensure key meets requirements. + """ + assert isinstance(key, six.string_types), "Key not a string. Got %s" % type(key) + assert key, "Key cannot be empty string." diff --git a/st2common/st2common/middleware/instrumentation.py b/st2common/st2common/middleware/instrumentation.py new file mode 100644 index 0000000000..1def26f9e1 --- /dev/null +++ b/st2common/st2common/middleware/instrumentation.py @@ -0,0 +1,84 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [ + 'RequestInstrumentationMiddleware', + 'ResponseInstrumentationMiddleware' +] + +from st2common.router import Request +from st2common.metrics.base import CounterWithTimer +from st2common.metrics.base import get_driver + + +class RequestInstrumentationMiddleware(object): + """ + Instrumentation middleware which records various request related metrics. + """ + + def __init__(self, app, service_name): + """ + :param service_name: Service name (e.g. api, stream, auth). + :type service_name: ``str`` + """ + self.app = app + self._service_name = service_name + + def __call__(self, environ, start_response): + request = Request(environ) + + metrics_driver = get_driver() + + key = '%s.request.total' % (self._service_name) + metrics_driver.inc_counter(key) + + key = '%s.request.method.%s' % (self._service_name, request.method) + metrics_driver.inc_counter(key) + + path = request.path.replace('/', '_') + key = '%s.request.path.%s' % (self._service_name, path) + metrics_driver.inc_counter(key) + + # Track and time current number of processing requests + key = '%s.request' % (self._service_name) + with CounterWithTimer(key=key): + return self.app(environ, start_response) + + +class ResponseInstrumentationMiddleware(object): + """ + Instrumentation middleware which records various response related metrics. + """ + + def __init__(self, app, service_name): + """ + :param service_name: Service name (e.g. api, stream, auth). + :type service_name: ``str`` + """ + self.app = app + self._service_name = service_name + + def __call__(self, environ, start_response): + # Track and time current number of processing requests + def custom_start_response(status, headers, exc_info=None): + status_code = int(status.split(' ')[0]) + + metrics_driver = get_driver() + metrics_driver.inc_counter('%s.response.status.%s' % (self._service_name, + status_code)) + + return start_response(status, headers, exc_info) + + return self.app(environ, custom_start_response) diff --git a/st2common/st2common/services/action.py b/st2common/st2common/services/action.py index a6747c1a73..02970e09eb 100644 --- a/st2common/st2common/services/action.py +++ b/st2common/st2common/services/action.py @@ -63,7 +63,8 @@ def create_request(liveaction): """ # We import this here to avoid conflicts w/ runners that might import this # file since the runners don't have the config context by default. - from st2common.metrics.base import get_driver, format_metrics_key + from st2common.metrics.base import get_driver + # Use the user context from the parent action execution. Subtasks in a workflow # action can be invoked by a system user and so we want to use the user context # from the original workflow action. @@ -135,12 +136,8 @@ def create_request(liveaction): trace_service.get_trace_component_for_action_execution(execution, liveaction) ]) - get_driver().inc_counter( - format_metrics_key( - action_db=action_db, - key='action.%s' % (liveaction.status) - ) - ) + get_driver().inc_counter('action.executions.%s' % (liveaction.status)) + return liveaction, execution diff --git a/st2common/st2common/util/action_db.py b/st2common/st2common/util/action_db.py index 5c3bbf915e..95c00466d6 100644 --- a/st2common/st2common/util/action_db.py +++ b/st2common/st2common/util/action_db.py @@ -30,7 +30,7 @@ from st2common.persistence.action import Action from st2common.persistence.liveaction import LiveAction from st2common.persistence.runner import RunnerType -from st2common.metrics.base import format_metrics_key, get_driver +from st2common.metrics.base import get_driver LOG = logging.getLogger(__name__) @@ -197,22 +197,12 @@ def update_liveaction_status(status=None, result=None, context=None, end_timesta # If liveaction_db status is set then we need to decrement the counter # because it is transitioning to a new state if liveaction_db.status: - get_driver().dec_counter( - format_metrics_key( - liveaction_db=liveaction_db, - key='action.%s' % liveaction_db.status - ) - ) + get_driver().dec_counter('action.executions.%s' % (liveaction_db.status)) # If status is provided then we need to increment the timer because the action # is transitioning into this new state if status: - get_driver().inc_counter( - format_metrics_key( - liveaction_db=liveaction_db, - key='action.%s' % status - ) - ) + get_driver().dec_counter('action.executions.%s' % (status)) extra = {'liveaction_db': liveaction_db} LOG.debug('Updating ActionExection: "%s" with status="%s"', liveaction_db.id, status, diff --git a/st2common/tests/unit/test_metrics.py b/st2common/tests/unit/test_metrics.py index 75977340ef..b33f5bdac3 100644 --- a/st2common/tests/unit/test_metrics.py +++ b/st2common/tests/unit/test_metrics.py @@ -20,12 +20,16 @@ from oslo_config import cfg from st2common.metrics import base +from st2common.metrics.utils import get_full_key_name from st2common.metrics.drivers.statsd_driver import StatsdDriver -from st2common.constants.metrics import METRICS_COUNTER_SUFFIX, METRICS_TIMER_SUFFIX from st2common.util.date import get_datetime_utc_now __all__ = [ - 'TestBaseMetricsDriver' + 'TestBaseMetricsDriver', + 'TestStatsDMetricsDriver', + 'TestCounterContextManager', + 'TestTimerContextManager', + 'TestCounterWithTimerContextManager' ] cfg.CONF.set_override('driver', 'noop', group='metrics') @@ -54,6 +58,8 @@ class TestStatsDMetricsDriver(unittest2.TestCase): @patch('st2common.metrics.drivers.statsd_driver.statsd') def setUp(self, statsd): + cfg.CONF.set_override(name='prefix', group='metrics', override=None) + self._driver = StatsdDriver() statsd.Connection.set_defaults.assert_called_once_with( @@ -66,14 +72,14 @@ def test_time(self): self._driver._timer = statsd.Timer('') params = ('test', 10) self._driver.time(*params) - statsd.Timer().send.assert_called_with(*params) + statsd.Timer().send.assert_called_with('st2.test', 10) def test_time_with_float(self): statsd = MagicMock() self._driver._timer = statsd.Timer('') params = ('test', 10.5) self._driver.time(*params) - statsd.Timer().send.assert_called_with(*params) + statsd.Timer().send.assert_called_with('st2.test', 10.5) def test_time_with_invalid_key(self): params = (2, 2) @@ -137,6 +143,60 @@ def test_dec_timer_with_invalid_amount(self): with self.assertRaises(AssertionError): self._driver.dec_counter(*params) + @patch('st2common.metrics.drivers.statsd_driver.statsd') + def test_set_gauge_success(self, statsd): + params = ('key', 100) + mock_gauge = MagicMock() + statsd.Gauge().send.side_effect = mock_gauge + self._driver.set_gauge(*params) + mock_gauge.assert_called_once_with(None, params[1]) + + @patch('st2common.metrics.drivers.statsd_driver.statsd') + def test_inc_gauge_success(self, statsd): + params = ('key1',) + mock_gauge = MagicMock() + statsd.Gauge().increment.side_effect = mock_gauge + self._driver.inc_gauge(*params) + mock_gauge.assert_called_once_with(None, 1) + + params = ('key2', 100) + mock_gauge = MagicMock() + statsd.Gauge().increment.side_effect = mock_gauge + self._driver.inc_gauge(*params) + mock_gauge.assert_called_once_with(None, params[1]) + + @patch('st2common.metrics.drivers.statsd_driver.statsd') + def test_dec_gauge_success(self, statsd): + params = ('key1',) + mock_gauge = MagicMock() + statsd.Gauge().decrement.side_effect = mock_gauge + self._driver.dec_gauge(*params) + mock_gauge.assert_called_once_with(None, 1) + + params = ('key2', 100) + mock_gauge = MagicMock() + statsd.Gauge().decrement.side_effect = mock_gauge + self._driver.dec_gauge(*params) + mock_gauge.assert_called_once_with(None, params[1]) + + def test_get_full_key_name(self): + # No prefix specified in the config + cfg.CONF.set_override(name='prefix', group='metrics', override=None) + + result = get_full_key_name('api.requests') + self.assertEqual(result, 'st2.api.requests') + + # Prefix is defined in the config + cfg.CONF.set_override(name='prefix', group='metrics', override='staging') + + result = get_full_key_name('api.requests') + self.assertEqual(result, 'st2.staging.api.requests') + + cfg.CONF.set_override(name='prefix', group='metrics', override='prod') + + result = get_full_key_name('api.requests') + self.assertEqual(result, 'st2.prod.api.requests') + class TestCounterContextManager(unittest2.TestCase): @patch('st2common.metrics.base.METRICS') @@ -145,7 +205,6 @@ def test_counter(self, metrics_patch): with base.Counter(test_key): metrics_patch.inc_counter.assert_called_with(test_key) metrics_patch.dec_counter.assert_not_called() - metrics_patch.dec_counter.assert_called_with(test_key) class TestTimerContextManager(unittest2.TestCase): @@ -209,8 +268,7 @@ def test_time(self, metrics_patch, datetime_patch): self.assertTrue(isinstance(timer._start_time, datetime)) metrics_patch.time.assert_not_called() timer.send_time() - metrics_patch.time.assert_called_with( - "%s%s" % (test_key, METRICS_TIMER_SUFFIX), + metrics_patch.time.assert_called_with(test_key, (self.end_time - self.middle_time).total_seconds() ) second_test_key = "lakshmi_has_a_nose" @@ -224,13 +282,10 @@ def test_time(self, metrics_patch, datetime_patch): time_delta.total_seconds(), (self.end_time - self.middle_time).total_seconds() ) - metrics_patch.inc_counter.assert_called_with( - "%s%s" % (test_key, METRICS_COUNTER_SUFFIX) - ) + metrics_patch.inc_counter.assert_called_with(test_key) metrics_patch.dec_counter.assert_not_called() - metrics_patch.dec_counter.assert_called_with("%s%s" % (test_key, METRICS_COUNTER_SUFFIX)) metrics_patch.time.assert_called_with( - "%s%s" % (test_key, METRICS_TIMER_SUFFIX), + test_key, (self.end_time - self.start_time).total_seconds() ) @@ -256,8 +311,7 @@ def _get_tested(metrics_counter_with_timer=None): self.assertTrue(isinstance(metrics_counter_with_timer._start_time, datetime)) metrics_patch.time.assert_not_called() metrics_counter_with_timer.send_time() - metrics_patch.time.assert_called_with( - "%s%s" % (test_key, METRICS_TIMER_SUFFIX), + metrics_patch.time.assert_called_with(test_key, (end_time - middle_time).total_seconds() ) second_test_key = "lakshmi_has_a_nose" @@ -271,16 +325,12 @@ def _get_tested(metrics_counter_with_timer=None): time_delta.total_seconds(), (end_time - middle_time).total_seconds() ) - metrics_patch.inc_counter.assert_called_with( - "%s%s" % (test_key, METRICS_COUNTER_SUFFIX) - ) + metrics_patch.inc_counter.assert_called_with(test_key) metrics_patch.dec_counter.assert_not_called() _get_tested() - metrics_patch.dec_counter.assert_called_with("%s%s" % (test_key, METRICS_COUNTER_SUFFIX)) - metrics_patch.time.assert_called_with( - "%s%s" % (test_key, METRICS_TIMER_SUFFIX), + metrics_patch.time.assert_called_with(test_key, (end_time - start_time).total_seconds() ) @@ -295,7 +345,6 @@ def _get_tested(): metrics_patch.inc_counter.assert_called_with(test_key) metrics_patch.dec_counter.assert_not_called() _get_tested() - metrics_patch.dec_counter.assert_called_with(test_key) class TestTimerDecorator(unittest2.TestCase): @@ -339,79 +388,3 @@ def _get_tested(metrics_timer=None): test_key, (end_time - start_time).total_seconds() ) - - -class TestFormatMetrics(unittest2.TestCase): - def test_format_metrics_liveaction_db_without_key(self): - pack = 'test' - action = 'lakface' - - liveaction_db = MagicMock() - liveaction_db.context = {'pack': pack} - liveaction_db.action = action - - key = base.format_metrics_key(liveaction_db=liveaction_db) - - self.assertEquals(key, "st2.%s.%s" % (pack, action)) - - def test_format_metrics_liveaction_db_with_key(self): - pack = 'test' - action = 'lakface' - test_key = 'meh' - - liveaction_db = MagicMock() - liveaction_db.context = {'pack': pack} - liveaction_db.action = "%s.%s" % (pack, action) - - key = base.format_metrics_key(liveaction_db=liveaction_db, key=test_key) - - self.assertEquals(key, "st2.%s.%s.%s" % (pack, action, test_key)) - - def test_format_metrics_liveaction_db_without_pack(self): - action = 'lakface' - pack = 'unknown' - - liveaction_db = MagicMock() - liveaction_db.context = {} - liveaction_db.action = "%s.%s" % (pack, action) - - key = base.format_metrics_key(liveaction_db=liveaction_db) - - self.assertEquals(key, "st2.%s.%s" % (pack, action)) - - def test_format_metrics_action_db_without_key(self): - pack = 'test' - action = 'lakface' - - action_db = MagicMock() - action_db.pack = pack - action_db.name = action - - key = base.format_metrics_key(action_db=action_db) - - self.assertEquals(key, "st2.%s.%s" % (pack, action)) - - def test_format_metrics_action_db_with_key(self): - pack = 'test' - action = 'lakface' - test_key = 'meh' - - action_db = MagicMock() - action_db.pack = pack - action_db.name = action - - key = base.format_metrics_key(action_db=action_db, key=test_key) - - self.assertEquals(key, "st2.%s.%s.%s" % (pack, action, test_key)) - - def test_format_metrics_action_db_without_pack(self): - action = 'lakface' - pack = 'unknown' - - action_db = MagicMock() - action_db.pack = None - action_db.name = action - - key = base.format_metrics_key(action_db=action_db) - - self.assertEquals(key, "st2.%s.%s" % (pack, action)) diff --git a/st2reactor/st2reactor/cmd/timersengine.py b/st2reactor/st2reactor/cmd/timersengine.py index 43ced30930..acf1e6932e 100644 --- a/st2reactor/st2reactor/cmd/timersengine.py +++ b/st2reactor/st2reactor/cmd/timersengine.py @@ -82,7 +82,7 @@ def main(): except SystemExit as exit_code: sys.exit(exit_code) except: - LOG.exception('(PID=%s) RulesEngine quit due to exception.', os.getpid()) + LOG.exception('(PID=%s) TimerEngine quit due to exception.', os.getpid()) return 1 finally: _teardown() diff --git a/st2reactor/st2reactor/rules/engine.py b/st2reactor/st2reactor/rules/engine.py index bf6c6f24d1..0731c3a1f3 100644 --- a/st2reactor/st2reactor/rules/engine.py +++ b/st2reactor/st2reactor/rules/engine.py @@ -19,10 +19,14 @@ from st2common.services.triggers import get_trigger_db_by_ref from st2reactor.rules.enforcer import RuleEnforcer from st2reactor.rules.matcher import RulesMatcher -from st2common.metrics.base import format_metrics_key, get_driver +from st2common.metrics.base import get_driver LOG = logging.getLogger('st2reactor.rules.RulesEngine') +__all__ = [ + 'RulesEngine' +] + class RulesEngine(object): def handle_trigger_instance(self, trigger_instance): @@ -70,14 +74,12 @@ def create_rule_enforcers(self, trigger_instance, matching_rules): This method is trigger_instance specific therefore if creation of 1 RuleEnforcer fails it is likely that all wil be broken. """ + metrics_driver = get_driver() + enforcers = [] for matching_rule in matching_rules: - - get_driver().inc_counter( - format_metrics_key( - key='rule.%s' % matching_rule - ) - ) + metrics_driver.inc_counter('rule.matched') + metrics_driver.inc_counter('rule.%s.matched' % (matching_rule.ref)) enforcers.append(RuleEnforcer(trigger_instance, matching_rule)) return enforcers diff --git a/st2reactor/st2reactor/rules/worker.py b/st2reactor/st2reactor/rules/worker.py index 78e0de9d41..709b239d4a 100644 --- a/st2reactor/st2reactor/rules/worker.py +++ b/st2reactor/st2reactor/rules/worker.py @@ -14,6 +14,7 @@ # limitations under the License. from __future__ import absolute_import + from kombu import Connection from st2common import log as logging @@ -26,7 +27,9 @@ import st2reactor.container.utils as container_utils from st2reactor.rules.engine import RulesEngine from st2common.transport.queues import RULESENGINE_WORK_QUEUE -from st2common.metrics.base import format_metrics_key, get_driver +from st2common.metrics.base import CounterWithTimer +from st2common.metrics.base import Timer +from st2common.metrics.base import get_driver LOG = logging.getLogger(__name__) @@ -58,16 +61,11 @@ def pre_ack_process(self, message): return self._compose_pre_ack_process_response(trigger_instance, message) def process(self, pre_ack_response): - trigger_instance, message = self._decompose_pre_ack_process_response(pre_ack_response) if not trigger_instance: raise ValueError('No trigger_instance provided for processing.') - get_driver().inc_counter( - format_metrics_key( - key='trigger.%s' % (trigger_instance.trigger) - ) - ) + get_driver().inc_counter('trigger.%s.processed' % (trigger_instance.trigger)) try: # Use trace_context from the message and if not found create a new context @@ -87,7 +85,11 @@ def process(self, pre_ack_response): container_utils.update_trigger_instance_status( trigger_instance, trigger_constants.TRIGGER_INSTANCE_PROCESSING) - self.rules_engine.handle_trigger_instance(trigger_instance) + + with CounterWithTimer(key='rule.processed'): + with Timer(key='triggerinstance.%s.processed' % (trigger_instance.id)): + self.rules_engine.handle_trigger_instance(trigger_instance) + container_utils.update_trigger_instance_status( trigger_instance, trigger_constants.TRIGGER_INSTANCE_PROCESSED) except: diff --git a/st2stream/st2stream/app.py b/st2stream/st2stream/app.py index 75ff60c636..5b106ca90d 100644 --- a/st2stream/st2stream/app.py +++ b/st2stream/st2stream/app.py @@ -31,6 +31,8 @@ from st2common.middleware.cors import CorsMiddleware from st2common.middleware.request_id import RequestIDMiddleware from st2common.middleware.logging import LoggingMiddleware +from st2common.middleware.instrumentation import RequestInstrumentationMiddleware +from st2common.middleware.instrumentation import ResponseInstrumentationMiddleware from st2common.router import Router from st2common.util.monkey_patch import monkey_patch from st2common.constants.system import VERSION_STRING @@ -77,6 +79,8 @@ def setup_app(config={}): app = ErrorHandlingMiddleware(app) app = CorsMiddleware(app) app = LoggingMiddleware(app, router) + app = ResponseInstrumentationMiddleware(app, service_name='stream') app = RequestIDMiddleware(app) + app = RequestInstrumentationMiddleware(app, service_name='stream') return app