From ae30a18db5d8689366d5827a93c60314711416ee Mon Sep 17 00:00:00 2001 From: Artem Batalov Date: Mon, 4 Apr 2022 15:43:57 +0300 Subject: [PATCH] Access to monitoring singleton only via monitoring module --- core/db_adapter/aioredis_adapter.py | 8 ++++---- core/db_adapter/aioredis_sentinel_adapter.py | 8 ++++---- core/db_adapter/ceph/ceph_adapter.py | 4 ++-- core/db_adapter/db_adapter.py | 16 ++++++++-------- core/db_adapter/ignite_adapter.py | 6 +++--- core/mq/kafka/async_kafka_publisher.py | 6 +++--- core/mq/kafka/kafka_consumer.py | 6 +++--- core/mq/kafka/kafka_publisher.py | 10 +++++----- core/request/rest_request.py | 4 ++-- core/unified_template/unified_template.py | 4 ++-- core/utils/rerunable.py | 4 ++-- .../form_filling_scenario.py | 4 ++-- .../tree_scenario/tree_scenario.py | 4 ++-- smart_kit/start_points/base_main_loop.py | 5 ++--- smart_kit/utils/monitoring.py | 12 ++++++------ 15 files changed, 50 insertions(+), 51 deletions(-) diff --git a/core/db_adapter/aioredis_adapter.py b/core/db_adapter/aioredis_adapter.py index 38e498b9..b85d6084 100644 --- a/core/db_adapter/aioredis_adapter.py +++ b/core/db_adapter/aioredis_adapter.py @@ -5,7 +5,7 @@ from core.db_adapter.db_adapter import AsyncDBAdapter from core.db_adapter import error -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.logging.logger_utils import log @@ -21,15 +21,15 @@ def __init__(self, config=None): except KeyError: pass - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") async def save(self, id, data): return await self._async_run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") async def replace_if_equals(self, id, sample, data): return await self._async_run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.monitoring.got_histogram_decorate("get_time") async def get(self, id): return await self._async_run(self._get, id) diff --git a/core/db_adapter/aioredis_sentinel_adapter.py b/core/db_adapter/aioredis_sentinel_adapter.py index 3f890df3..5d01a59f 100644 --- a/core/db_adapter/aioredis_sentinel_adapter.py +++ b/core/db_adapter/aioredis_sentinel_adapter.py @@ -5,7 +5,7 @@ from aioredis.sentinel import Sentinel from core.db_adapter.db_adapter import AsyncDBAdapter from core.db_adapter import error -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.logging.logger_utils import log @@ -22,16 +22,16 @@ def __init__(self, config=None): except KeyError: pass - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") async def save(self, id, data): return await self._async_run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") async def replace_if_equals(self, id, sample, data): return await self._async_run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.monitoring.got_histogram_decorate("get_time") async def get(self, id): return await self._async_run(self._get, id) diff --git a/core/db_adapter/ceph/ceph_adapter.py b/core/db_adapter/ceph/ceph_adapter.py index 97d91bb2..ce75c8b9 100644 --- a/core/db_adapter/ceph/ceph_adapter.py +++ b/core/db_adapter/ceph/ceph_adapter.py @@ -9,7 +9,7 @@ from core.db_adapter.ceph.ceph_io import CephIO from core.db_adapter.db_adapter import DBAdapter from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring ssl._create_default_https_context = ssl._create_unverified_context @@ -35,7 +35,7 @@ def connect(self): params={log_const.KEY_NAME: log_const.HANDLED_EXCEPTION_VALUE}, level="ERROR", exc_info=True) - monitoring.got_counter("ceph_connection_exception") + monitoring.monitoring.got_counter("ceph_connection_exception") raise @property diff --git a/core/db_adapter/db_adapter.py b/core/db_adapter/db_adapter.py index 0ff718c3..9582dd85 100644 --- a/core/db_adapter/db_adapter.py +++ b/core/db_adapter/db_adapter.py @@ -5,7 +5,7 @@ from core.logging.logger_utils import log from core.model.factory import build_factory from core.model.registered import Registered -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.utils.rerunable import Rerunable db_adapters = Registered() @@ -65,15 +65,15 @@ def path_exists(self, path): def mtime(self, path): return self._run(self._mtime, path) - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") def save(self, id, data): return self._run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.monitoring.got_histogram_decorate("save_time") def replace_if_equals(self, id, sample, data): return self._run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.monitoring.got_histogram_decorate("get_time") def get(self, id): return self._run(self._get, id) @@ -109,15 +109,15 @@ async def _path_exists(self, path): async def path_exists(self, path): return await self._async_run(self._path_exists, path) - @monitoring.got_histogram("save_time") + @monitoring.monitoring.got_histogram("save_time") async def save(self, id, data): return await self._async_run(self._save, id, data) - @monitoring.got_histogram("save_time") + @monitoring.monitoring.got_histogram("save_time") async def replace_if_equals(self, id, sample, data): return await self._async_run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram("get_time") + @monitoring.monitoring.got_histogram("get_time") async def get(self, id): return await self._async_run(self._get, id) @@ -144,5 +144,5 @@ async def _async_run(self, action, *args, _try_count=None, **kwargs): result = await self._async_run(action, *args, _try_count=_try_count, **kwargs) counter_name = self._get_counter_name() if counter_name: - monitoring.got_counter(f"{counter_name}_exception") + monitoring.monitoring.got_counter(f"{counter_name}_exception") return result diff --git a/core/db_adapter/ignite_adapter.py b/core/db_adapter/ignite_adapter.py index b5405ec3..53d6795c 100644 --- a/core/db_adapter/ignite_adapter.py +++ b/core/db_adapter/ignite_adapter.py @@ -11,7 +11,7 @@ from core.db_adapter import error from core.db_adapter.db_adapter import AsyncDBAdapter from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring class IgniteAdapter(AsyncDBAdapter): @@ -55,7 +55,7 @@ async def connect(self): params={log_const.KEY_NAME: log_const.HANDLED_EXCEPTION_VALUE}, level="ERROR", exc_info=True) - monitoring.got_counter("ignite_connection_exception") + monitoring.monitoring.got_counter("ignite_connection_exception") raise async def _save(self, id, data): @@ -75,7 +75,7 @@ async def get_cache(self): if self._client is None: log('Attempt to recreate ignite instance', level="WARNING") await self.connect() - monitoring.got_counter("ignite_reconnection") + monitoring.monitoring.got_counter("ignite_reconnection") return self._cache @property diff --git a/core/mq/kafka/async_kafka_publisher.py b/core/mq/kafka/async_kafka_publisher.py index 76ad269b..5bf271ae 100644 --- a/core/mq/kafka/async_kafka_publisher.py +++ b/core/mq/kafka/async_kafka_publisher.py @@ -3,7 +3,7 @@ import core.logging.logger_constants as log_const from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.mq.kafka.kafka_publisher import KafkaPublisher @@ -30,7 +30,7 @@ def send(self, value, key=None, topic_key=None, headers=None): } log("KafkaProducer: Local producer queue is full (%(queue_amount)s messages awaiting delivery):" " try again\n", params=params, level="ERROR") - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") def send_to_topic(self, value, key=None, topic=None, headers=None): try: @@ -52,7 +52,7 @@ def send_to_topic(self, value, key=None, topic=None, headers=None): } log("KafkaProducer: Local producer queue is full (%(queue_amount)s messages awaiting delivery):" " try again\n", params=params, level="ERROR") - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") def _poll_for_callbacks(self): poll_timeout = self._config.get("poll_timeout", 1) diff --git a/core/mq/kafka/kafka_consumer.py b/core/mq/kafka/kafka_consumer.py index afd0cab4..6e8d22d1 100644 --- a/core/mq/kafka/kafka_consumer.py +++ b/core/mq/kafka/kafka_consumer.py @@ -10,7 +10,7 @@ import core.logging.logger_constants as log_const from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.mq.kafka.base_kafka_consumer import BaseKafkaConsumer @@ -102,7 +102,7 @@ def _error_callback(self, err): log_const.KEY_NAME: log_const.EXCEPTION_VALUE } log("KafkaConsumer: Error: %(error)s", params=params, level="WARNING") - monitoring.got_counter("kafka_consumer_exception") + monitoring.monitoring.got_counter("kafka_consumer_exception") # noinspection PyMethodMayBeStatic def _process_message(self, msg: KafkaMessage): @@ -111,7 +111,7 @@ def _process_message(self, msg: KafkaMessage): if err.code() == KafkaError._PARTITION_EOF: return None else: - monitoring.got_counter("kafka_consumer_exception") + monitoring.monitoring.got_counter("kafka_consumer_exception") params = { "code": err.code(), "pid": os.getpid(), diff --git a/core/mq/kafka/kafka_publisher.py b/core/mq/kafka/kafka_publisher.py index 51ba44d7..f87e6dca 100644 --- a/core/mq/kafka/kafka_publisher.py +++ b/core/mq/kafka/kafka_publisher.py @@ -7,7 +7,7 @@ import core.logging.logger_constants as log_const from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.mq.kafka.base_kafka_publisher import BaseKafkaPublisher @@ -41,7 +41,7 @@ def send(self, value, key=None, topic_key=None, headers=None): } log("KafkaProducer: Local producer queue is full (%(queue_amount)s messages awaiting delivery):" " try again\n", params=params, level="ERROR") - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") self._poll() def send_to_topic(self, value, key=None, topic=None, headers=None): @@ -64,7 +64,7 @@ def send_to_topic(self, value, key=None, topic=None, headers=None): } log("KafkaProducer: Local producer queue is full (%(queue_amount)s messages awaiting delivery):" " try again\n", params=params, level="ERROR") - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") self._poll() def _poll(self): @@ -81,7 +81,7 @@ def _error_callback(self, err): log_const.KEY_NAME: log_const.EXCEPTION_VALUE } log("KafkaProducer: Error: %(error)s", params=params, level="ERROR") - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") def _delivery_callback(self, err, msg): if err: @@ -101,7 +101,7 @@ def _delivery_callback(self, err, msg): log_const.KEY_NAME: log_const.EXCEPTION_VALUE}, level="ERROR", exc_info=True) - monitoring.got_counter("kafka_producer_exception") + monitoring.monitoring.got_counter("kafka_producer_exception") def close(self): self._producer.flush(self._config["flush_timeout"]) diff --git a/core/request/rest_request.py b/core/request/rest_request.py index b92b0018..f787614e 100644 --- a/core/request/rest_request.py +++ b/core/request/rest_request.py @@ -2,7 +2,7 @@ from timeout_decorator import timeout_decorator from core.request.base_request import BaseRequest from core.utils.exception_handlers import exc_handler -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring class RestNoMethodSpecifiedException(Exception): @@ -37,7 +37,7 @@ def run(self, data, params=None): return method(data) def on_timeout_error(self, *args, **kwarg): - monitoring.got_counter("core_rest_run_timeout") + monitoring.monitoring.got_counter("core_rest_run_timeout") def _requests_get(self, params): return requests.get(self.url, params=params, **self.rest_args).text diff --git a/core/unified_template/unified_template.py b/core/unified_template/unified_template.py index 1832ba03..3cd2191d 100644 --- a/core/unified_template/unified_template.py +++ b/core/unified_template/unified_template.py @@ -5,7 +5,7 @@ import core.logging.logger_constants as log_const from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring UNIFIED_TEMPLATE_TYPE_NAME = "unified_template" @@ -49,7 +49,7 @@ def render(self, *args, **kwargs): "params_dict_str": str(params_dict)}, level="ERROR", exc_info=True) - monitoring.got_counter("core_jinja_template_error") + monitoring.monitoring.got_counter("core_jinja_template_error") raise return result diff --git a/core/utils/rerunable.py b/core/utils/rerunable.py index 369a6ee8..431cf7b3 100644 --- a/core/utils/rerunable.py +++ b/core/utils/rerunable.py @@ -2,7 +2,7 @@ import core.logging.logger_constants as log_const from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring class Rerunable(): @@ -44,7 +44,7 @@ def _run(self, action, *args, _try_count=None, **kwargs): result = self._run(action, *args, _try_count=_try_count, **kwargs) counter_name = self._get_counter_name() if counter_name: - monitoring.got_counter(f"{counter_name}_exception") + monitoring.monitoring.got_counter(f"{counter_name}_exception") return result def _get_counter_name(self): diff --git a/scenarios/scenario_descriptions/form_filling_scenario.py b/scenarios/scenario_descriptions/form_filling_scenario.py index fc31558a..9fe34b3f 100644 --- a/scenarios/scenario_descriptions/form_filling_scenario.py +++ b/scenarios/scenario_descriptions/form_filling_scenario.py @@ -2,7 +2,7 @@ from typing import Dict, Any from core.basic_models.scenarios.base_scenario import BaseScenario -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.logging.logger_utils import log import scenarios.logging.logger_constants as log_const @@ -174,7 +174,7 @@ async def get_reply(self, user, text_preprocessing_result, reply_actions, field, user.last_scenarios.delete(self.id) return action_messages - @monitoring.got_histogram_decorate("scenario_time") + @monitoring.monitoring.got_histogram_decorate("scenario_time") async def run(self, text_preprocessing_result, user, params: Dict[str, Any] = None): form = self._get_form(user) user.last_scenarios.add(self.id, text_preprocessing_result) diff --git a/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py b/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py index a756d02a..f69c797b 100644 --- a/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py +++ b/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py @@ -4,7 +4,7 @@ from scenarios.scenario_descriptions.form_filling_scenario import FormFillingScenario from scenarios.scenario_descriptions.tree_scenario.tree_scenario_node import TreeScenarioNode from core.model.factory import dict_factory -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.logging.logger_utils import log import scenarios.logging.logger_constants as log_const from scenarios.scenario_models.history import Event, HistoryConstants @@ -83,7 +83,7 @@ def get_fields_data(self, main_form, form_type): all_forms_fields.update(form_field_data) return all_forms_fields - @monitoring.got_histogram_decorate("scenario_time") + @monitoring.monitoring.got_histogram_decorate("scenario_time") async def run(self, text_preprocessing_result, user, params: Dict[str, Any] = None): main_form = self._get_form(user) user.last_scenarios.add(self.id, text_preprocessing_result) diff --git a/smart_kit/start_points/base_main_loop.py b/smart_kit/start_points/base_main_loop.py index 2e085d99..9b301612 100644 --- a/smart_kit/start_points/base_main_loop.py +++ b/smart_kit/start_points/base_main_loop.py @@ -8,7 +8,7 @@ from core.db_adapter.db_adapter import DBAdapterException from core.db_adapter.db_adapter import db_adapter_factory from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring from core.monitoring.healthcheck_handler import RootResource from core.monitoring.twisted_server import TwistedServer from core.model.base_user import BaseUser @@ -95,8 +95,7 @@ def _create_health_check_server(self, settings): def _init_monitoring_config(self, template_settings): monitoring_config = template_settings["monitoring"] - monitoring.apply_config(monitoring_config) - smart_kit_metrics.apply_config(monitoring_config) + monitoring.monitoring.apply_config(monitoring_config) smart_kit_metrics.init_metrics(app_name=self.app_name) async def load_user(self, db_uid, message): diff --git a/smart_kit/utils/monitoring.py b/smart_kit/utils/monitoring.py index 2b981fd1..4099ff25 100644 --- a/smart_kit/utils/monitoring.py +++ b/smart_kit/utils/monitoring.py @@ -1,6 +1,6 @@ from core.logging.logger_constants import KEY_NAME from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring +from core.monitoring import monitoring def _filter_monitoring_msg(msg): @@ -39,7 +39,7 @@ def init_metrics(self, app_name): "Incoming message validation error.") def _get_or_create_counter(self, monitoring_msg, descr, labels=()): - counter = monitoring.get_counter(monitoring_msg, descr, labels) + counter = monitoring.monitoring.get_counter(monitoring_msg, descr, labels) if counter is None: raise MetricDisabled('counter disabled') return counter @@ -182,22 +182,22 @@ def counter_mq_long_waiting(self, app_name): @silence_it def sampling_load_time(self, app_name, value): monitoring_msg = "{}_load_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + monitoring.monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) @silence_it def sampling_script_time(self, app_name, value): monitoring_msg = "{}_script_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + monitoring.monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) @silence_it def sampling_save_time(self, app_name, value): monitoring_msg = "{}_save_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + monitoring.monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) @silence_it def sampling_mq_waiting_time(self, app_name, value): monitoring_msg = "{}_mq_waiting_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + monitoring.monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) smart_kit_metrics = Metrics()