diff --git a/Makefile b/Makefile index d335a76..ca0a66a 100644 --- a/Makefile +++ b/Makefile @@ -45,9 +45,9 @@ rel-agent: check-version check-python-vars .PHONY: rel-docker-test-app rel-docker-test-app: - docker build -t ghcr.io/intergral/deep-python-client:simple-app $(ROOT_DIR)/examples/simple-app-docker + docker build -t intergral/deep-python:latest $(ROOT_DIR)/examples/simple-app-docker - docker push ghcr.io/intergral/deep-python-client:simple-app + docker push intergral/deep-python:latest .PHONY: docs docs: diff --git a/deep-python-client.iml b/deep-python-client.iml index c475758..128d56c 100644 --- a/deep-python-client.iml +++ b/deep-python-client.iml @@ -8,6 +8,7 @@ + diff --git a/src/deep/api/deep.py b/src/deep/api/deep.py index 77ee659..0b557a4 100644 --- a/src/deep/api/deep.py +++ b/src/deep/api/deep.py @@ -17,6 +17,7 @@ from deep.config import ConfigService from deep.config.tracepoint_config import TracepointConfigService from deep.grpc import GRPCService +from deep.logging.tracepoint_logger import TracepointLogger from deep.poll import LongPoll from deep.processor import TriggerHandler from deep.push import PushService @@ -78,3 +79,6 @@ def get(self) -> TracePointConfig: def unregister(self): self._tpServ.remove_custom(self._cfg) + + def tracepoint_logger(self, logger: 'TracepointLogger'): + self.config.tracepoint_logger = logger diff --git a/src/deep/api/tracepoint/eventsnapshot.py b/src/deep/api/tracepoint/eventsnapshot.py index 8921c16..1b90a5d 100644 --- a/src/deep/api/tracepoint/eventsnapshot.py +++ b/src/deep/api/tracepoint/eventsnapshot.py @@ -11,7 +11,7 @@ # GNU Affero General Public License for more details. import random -from typing import List, Dict +from typing import List, Dict, Optional from deep.api.attributes import BoundedAttributes from deep.api.resource import Resource @@ -34,6 +34,7 @@ def __init__(self, tracepoint, ts, resource, frames, var_lookup: Dict[str, 'Vari self._duration_nanos = 0 self._resource = Resource.get_empty().merge(resource) self._open = True + self._log = None def complete(self): if not self._open: @@ -44,10 +45,13 @@ def complete(self): def is_open(self): return self._open - def add_watch_result(self, watch_result, watch_lookup): + def add_watch_result(self, watch_result: 'WatchResult'): if self.is_open(): self.watches.append(watch_result) - self._var_lookup.update(watch_lookup) + + def merge_var_lookup(self, lookup: Dict[str, 'Variable']): + if self.is_open(): + self._var_lookup.update(lookup) @property def id(self): @@ -85,6 +89,14 @@ def duration_nanos(self): def resource(self): return self._resource + @property + def log_msg(self): + return self._log + + @log_msg.setter + def log_msg(self, msg): + self._log = msg + def __str__(self) -> str: return str(self.__dict__) @@ -278,22 +290,22 @@ class WatchResult: """ def __init__(self, - expression, - result, - error=None + expression: str, + result: Optional['VariableId'], + error: Optional[str] = None ): self._expression = expression self._result = result self._error = error @property - def expression(self): + def expression(self) -> str: return self._expression @property - def result(self): + def result(self) -> Optional['VariableId']: return self._result @property - def error(self): + def error(self) -> Optional[str]: return self._error diff --git a/src/deep/api/tracepoint/tracepoint_config.py b/src/deep/api/tracepoint/tracepoint_config.py index 1521b86..53618b2 100644 --- a/src/deep/api/tracepoint/tracepoint_config.py +++ b/src/deep/api/tracepoint/tracepoint_config.py @@ -49,6 +49,9 @@ NO_STACK = 'no_stack' """Do not collect the stack data""" +LOG_MSG = 'log_msg' +"""The log message to interpolate at position of tracepoint""" + def frame_type_ordinal(frame_type) -> int: """ diff --git a/src/deep/config/config_service.py b/src/deep/config/config_service.py index 0be39c1..0fdeeb1 100644 --- a/src/deep/config/config_service.py +++ b/src/deep/config/config_service.py @@ -17,6 +17,7 @@ from deep.api.plugin import Plugin from deep.api.resource import Resource from deep.config.tracepoint_config import TracepointConfigService +from deep.logging.tracepoint_logger import DefaultLogger, TracepointLogger class ConfigService: @@ -33,6 +34,7 @@ def __init__(self, custom: Dict[str, any]): self.__custom = custom self._resource = None self._tracepoint_config = TracepointConfigService() + self._tracepoint_logger: 'TracepointLogger' = DefaultLogger(self) def __getattribute__(self, name: str) -> Any: """ @@ -95,8 +97,19 @@ def plugins(self, plugins): self._plugins = plugins @property - def tracepoints(self) -> TracepointConfigService: + def tracepoints(self) -> 'TracepointConfigService': return self._tracepoint_config def add_listener(self, listener): self._tracepoint_config.add_listener(listener) + + @property + def tracepoint_logger(self) -> 'TracepointLogger': + return self._tracepoint_logger + + @tracepoint_logger.setter + def tracepoint_logger(self, logger: 'TracepointLogger'): + self._tracepoint_logger = logger + + def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str): + self._tracepoint_logger.log_tracepoint(log_msg, tp_id, snap_id) diff --git a/src/deep/logging/tracepoint_logger.py b/src/deep/logging/tracepoint_logger.py new file mode 100644 index 0000000..c464203 --- /dev/null +++ b/src/deep/logging/tracepoint_logger.py @@ -0,0 +1,36 @@ +# Copyright (C) 2023 Intergral GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +import abc + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from deep.config import ConfigService + +from deep import logging + + +class TracepointLogger(abc.ABC): + + @abc.abstractmethod + def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str): + pass + + +class DefaultLogger(TracepointLogger): + def __init__(self, _config: 'ConfigService'): + self._config = _config + + def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str): + logging.info(log_msg + " snapshot=%s tracepoint=%s" % (snap_id, tp_id)) diff --git a/src/deep/processor/frame_collector.py b/src/deep/processor/frame_collector.py index 820ce6c..545b4ad 100644 --- a/src/deep/processor/frame_collector.py +++ b/src/deep/processor/frame_collector.py @@ -11,7 +11,7 @@ # GNU Affero General Public License for more details. import abc -from typing import Dict, Tuple, Optional +from typing import Dict, Tuple, List, Optional from deep import logging from deep.api.tracepoint import StackFrame, WatchResult, Variable, VariableId @@ -19,6 +19,7 @@ from deep.utils import time_ns from .frame_config import FrameProcessorConfig from .variable_processor import process_variable, process_child_nodes, variable_to_string, truncate_string, Collector +from ..config import ConfigService class FrameCollector(Collector): @@ -26,7 +27,7 @@ class FrameCollector(Collector): This deals with collecting data from the paused frames. """ - def __init__(self, frame, config): + def __init__(self, frame, config: ConfigService): self._var_cache: Dict[str, str] = {} self._config = config self._has_time_exceeded = False @@ -47,17 +48,56 @@ def configure_self(self): def add_child_to_lookup(self, parent_id: str, child: VariableId): self._var_lookup[parent_id].children.append(child) - def eval_watch(self, watch): + def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str): + self._config.log_tracepoint(log_msg, tp_id, snap_id) + + def process_log(self, tp, log_msg) -> Tuple[str, List[WatchResult], Dict[str, Variable]]: + frame_col = self + watch_results = [] + _var_lookup = {} + + class FormatDict(dict): + """This type is used in the log process to ensure that missing values are formatted don't error""" + + def __missing__(self, key): + return "{%s}" % key + + import string + + class FormatExtractor(string.Formatter): + """This type allows us to use watches within log strings and collect the watch + as well as interpolate the values""" + + def get_field(self, field_name, args, kwargs): + # evaluate watch + watch, var_lookup, log_str = frame_col.eval_watch(field_name) + # collect data + watch_results.append(watch) + _var_lookup.update(var_lookup) + + return log_str, field_name + + log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self._frame.f_locals)) + return log_msg, watch_results, _var_lookup + + def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]: + """ + Evaluate an expression in the current frame. + :param watch: The watch expression to evaluate. + :return: Tuple with WatchResult, collected variables, and the log string for the expression + """ # reset var lookup - var cache is still used to reduce duplicates self._var_lookup = {} try: result = eval(watch, None, self._frame.f_locals) - watch_var, var_lookup = self.process_watch_result_breadth_first(watch, result) - return WatchResult(watch, watch_var), var_lookup + watch_var, var_lookup, log_str = self.process_watch_result_breadth_first(watch, result) + # again we reset the local version of the var lookup. + self._var_lookup = {} + return WatchResult(watch, watch_var), var_lookup, log_str except BaseException as e: logging.exception("Error evaluating watch %s", watch) - return WatchResult(watch, None, str(e)), {} + return WatchResult(watch, None, str(e)), {}, str(e) def process_frame(self): """ @@ -72,7 +112,11 @@ def process_frame(self): frame = self._process_frame(current_frame, self._frame_config.should_collect_vars(len(collected_frames))) collected_frames.append(frame) current_frame = current_frame.f_back - return collected_frames, self._var_lookup + # We want to clear the local collected var lookup now that we have processed the frame + # this is, so we can process watches later while maintaining independence between tracepoints + _vars = self._var_lookup + self._var_lookup = {} + return collected_frames, _vars def _process_frame(self, frame, process_vars): # process the current frame info @@ -172,13 +216,14 @@ def check_var_count(self): return False return True - def process_watch_result_breadth_first(self, watch: str, result: any): + def process_watch_result_breadth_first(self, watch: str, result: any) -> ( + Tuple)[VariableId, Dict[str, Variable], str]: identity_hash_id = str(id(result)) check_id = self.check_id(identity_hash_id) if check_id is not None: # this means the watch result is already in the var_lookup - return VariableId(check_id, watch), {} + return VariableId(check_id, watch), {}, str(result) # else this is an unknown value so process breadth first var_ids = [] @@ -201,7 +246,7 @@ def add_child(self, child): self._var_lookup[var_id] = Variable(str(variable_type.__name__), variable_value_str, identity_hash_id, [], truncated) - return VariableId(var_id, watch), self._var_lookup + return VariableId(var_id, watch), self._var_lookup, str(result) def check_id(self, identity_hash_id): if identity_hash_id in self._var_cache: diff --git a/src/deep/processor/frame_processor.py b/src/deep/processor/frame_processor.py index 203c77d..32c0ab4 100644 --- a/src/deep/processor/frame_processor.py +++ b/src/deep/processor/frame_processor.py @@ -14,6 +14,7 @@ from deep import logging from deep.api.attributes import BoundedAttributes from deep.api.tracepoint import TracePointConfig, EventSnapshot +from deep.api.tracepoint.tracepoint_config import LOG_MSG from deep.config import ConfigService from deep.processor.frame_collector import FrameCollector @@ -43,8 +44,19 @@ def collect(self): snapshot = EventSnapshot(tp, self._ts, self._config.resource, stack, variables) # process the snapshot watches for watch in tp.watches: - result, watch_lookup = self.eval_watch(watch) - snapshot.add_watch_result(result, watch_lookup) + result, watch_lookup, _ = self.eval_watch(watch) + snapshot.add_watch_result(result) + snapshot.merge_var_lookup(watch_lookup) + + log_msg = tp.get_arg(LOG_MSG, None) + if log_msg is not None: + processed_log, watch_results, watch_lookup = self.process_log(tp, log_msg) + snapshot.log_msg = processed_log + for watch in watch_results: + snapshot.add_watch_result(watch) + snapshot.merge_var_lookup(watch_lookup) + self.log_tracepoint(processed_log, tp.id, format(snapshot.id, "016x")) + # process the snapshot attributes attributes = self.process_attributes(tp) snapshot.attributes.merge_in(attributes) diff --git a/src/deep/processor/trigger_handler.py b/src/deep/processor/trigger_handler.py index 6cf1c1b..63ac4c6 100644 --- a/src/deep/processor/trigger_handler.py +++ b/src/deep/processor/trigger_handler.py @@ -15,8 +15,10 @@ import sys import threading +from deep.config import ConfigService from deep.config.tracepoint_config import ConfigUpdateListener from deep.processor.frame_processor import FrameProcessor +from deep.push import PushService def add_or_get(target, key, default_value): @@ -51,7 +53,7 @@ class TriggerHandler: should collect data. """ - def __init__(self, config, push_service): + def __init__(self, config: ConfigService, push_service: PushService): self._push_service = push_service self._tp_config = [] self._config = config diff --git a/src/deep/push/__init__.py b/src/deep/push/__init__.py index 59b75bb..e913d6a 100644 --- a/src/deep/push/__init__.py +++ b/src/deep/push/__init__.py @@ -77,7 +77,8 @@ def convert_snapshot(snapshot: EventSnapshot) -> Snapshot: attributes=[KeyValue(key=k, value=convert_value(v)) for k, v in snapshot.attributes.items()], duration_nanos=snapshot.duration_nanos, resource=[KeyValue(key=k, value=convert_value(v)) for k, v in - snapshot.resource.attributes.items()]) + snapshot.resource.attributes.items()], + log_msg=snapshot.log_msg) except Exception: logging.exception("Error converting to protobuf") return Snapshot() diff --git a/test/test_deep/processor/__init__.py b/test/test_deep/processor/__init__.py index a22412a..80f521d 100644 --- a/test/test_deep/processor/__init__.py +++ b/test/test_deep/processor/__init__.py @@ -9,3 +9,9 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. + +class MockFrame: + def __init__(self, _locals=None): + if _locals is None: + _locals = {} + self.f_locals = _locals diff --git a/test/test_deep/processor/test_log_messages.py b/test/test_deep/processor/test_log_messages.py new file mode 100644 index 0000000..29c88c1 --- /dev/null +++ b/test/test_deep/processor/test_log_messages.py @@ -0,0 +1,45 @@ +# Copyright (C) 2023 Intergral GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +import unittest + +from parameterized import parameterized + +from deep.config import ConfigService +from deep.processor.frame_processor import FrameProcessor +from test_deep.processor import MockFrame + + +class TestLogMessages(unittest.TestCase): + @parameterized.expand([ + ["some log message", "[deep] some log message", {}, []], + ["some log message: {name}", "[deep] some log message: bob", {'name': 'bob'}, ['bob']], + ["some log message: {len(name)}", "[deep] some log message: 3", {'name': 'bob'}, ['3']], + ["some log message: {person}", "[deep] some log message: {'name': 'bob'}", + {'person': {'name': 'bob'}}, ["Size: 1"]], + ["some log message: {person.name}", "[deep] some log message: 'dict' object has no attribute 'name'", + {'person': {'name': 'bob'}}, ["'dict' object has no attribute 'name'"]], + ["some log message: {person['name']}", "[deep] some log message: bob", {'person': {'name': 'bob'}}, ["bob"]], + ]) + def test_simple_log_interpolation(self, log_msg, expected_msg, _locals, expected_watches): + processor = FrameProcessor([], MockFrame(_locals), ConfigService({})) + processor.configure_self() + log, watches, _vars = processor.process_log({}, log_msg) + self.assertEqual(expected_msg, log) + self.assertEqual(len(expected_watches), len(watches)) + for i, watch in enumerate(watches): + if watch.error is None: + self.assertEqual(_vars[watch.result.vid].value, expected_watches[i]) + else: + self.assertEqual(watch.error, expected_watches[i])