diff --git a/CHANGELOG.md b/CHANGELOG.md index cdee4b0..bad074d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - **[FEATURE]**: feat(logging): initial implementation of log points [#3](https://github.com/intergral/deep/pull/3) [@Umaaz](https://github.com/Umaaz) - **[FEATURE]**: feat(plugins): change plugins to allow better customisation [#22](https://github.com/intergral/deep/pull/22) [@Umaaz](https://github.com/Umaaz) - **[FEATURE]**: feat(metrics): initial implementation of metric points [#21](https://github.com/intergral/deep/pull/21) [@Umaaz](https://github.com/Umaaz) +- **[FEATURE]**: feat(spans): initial implementation of span points [#25](https://github.com/intergral/deep/pull/25) [@Umaaz](https://github.com/Umaaz) - **[ENHANCEMENT]**: enhancement(trigger): change tracepoint handling to use triggers [#16](https://github.com/intergral/deep/pull/16) [@Umaaz](https://github.com/Umaaz) - **[BUGFIX]**: feat(api): add api function to register tracepoint directly [#8](https://github.com/intergral/deep/pull/8) [@Umaaz](https://github.com/Umaaz) @@ -14,8 +15,8 @@ \ No newline at end of file diff --git a/Makefile b/Makefile index eb13b17..07fe739 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ it-test: .PHONY: coverage coverage: - pytest tests/unit_tests --cov=deep --cov-report term --cov-fail-under=83 --cov-report html --cov-branch + pytest tests/unit_tests --cov=deep --cov-report term --cov-fail-under=84 --cov-report html --cov-branch .PHONY: lint lint: diff --git a/examples/simple-app-otel/src/simple-app/main.py b/examples/simple-app-otel/src/simple-app/main.py index d02f4fb..719690a 100644 --- a/examples/simple-app-otel/src/simple-app/main.py +++ b/examples/simple-app-otel/src/simple-app/main.py @@ -19,12 +19,15 @@ import time from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, +) import deep +from deep.api.tracepoint.constants import METHOD_NAME, SPAN, METHOD, FIRE_COUNT, SNAPSHOT, NO_COLLECT from simple_test import SimpleTest @@ -63,16 +66,26 @@ def main(): resource = Resource(attributes={ SERVICE_NAME: "your-service-name" }) + + jaeger_exporter = JaegerExporter( + agent_host_name="localhost", + agent_port=6831 + ) + provider = TracerProvider(resource=resource) - processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/api/traces")) - provider.add_span_processor(processor) + provider.add_span_processor(SimpleSpanProcessor(jaeger_exporter)) # Sets the global default tracer provider trace.set_tracer_provider(provider) - deep.start({ + _deep = deep.start({ 'SERVICE_URL': 'localhost:43315', 'SERVICE_SECURE': 'False', + 'POLL_TIMER': 10000 }) + tracepoint = _deep.register_tracepoint("simple_test.py", -1, + {METHOD_NAME: 'message', SPAN: METHOD, FIRE_COUNT: '-1', + SNAPSHOT: NO_COLLECT}) + print("app running") main() diff --git a/src/deep/api/plugin/otel.py b/src/deep/api/plugin/otel.py index b0ba8a0..efc2258 100644 --- a/src/deep/api/plugin/otel.py +++ b/src/deep/api/plugin/otel.py @@ -17,8 +17,10 @@ from typing import Optional +import deep.logging from deep.api.attributes import BoundedAttributes from deep.api.plugin import DidNotEnable, SnapshotDecorator, ResourceProvider +from deep.api.plugin.span import SpanProcessor, Span from deep.api.resource import Resource from deep.processor.context.action_context import ActionContext @@ -30,13 +32,92 @@ raise DidNotEnable("opentelemetry is not installed", e) -class OTelPlugin(ResourceProvider, SnapshotDecorator): +class _OtelSpan(Span): + """Wrap Otel span in common interface.""" + + def __init__(self, proxy: _Span): + """ + Create a new wrapper for Otel span. + + :param proxy: the underlying otel span + """ + self.proxy = proxy + + @property + def name(self): + """Get the span name.""" + return self.proxy.name + + @property + def trace_id(self): + """Get the trace id.""" + return self.__format_trace_id(self.proxy.context.trace_id) + + @property + def span_id(self): + """Get the span id.""" + return self.__format_span_id(self.proxy.context.span_id) + + def add_attribute(self, key: str, value: str): + """ + Add an attribute to the span. + + :param key: the attribute key + :param value: the attribute value + """ + self.proxy.set_attribute(key, value) + + def close(self): + """Close the span.""" + if not self.proxy.end_time: + try: + self.proxy.end() + except Exception: + deep.logging.exception("failed to close span") + + @staticmethod + def __format_span_id(_id): + return format(_id, "016x") + + @staticmethod + def __format_trace_id(_id): + return format(_id, "032x") + + +class OTelPlugin(ResourceProvider, SnapshotDecorator, SpanProcessor): """ Deep Otel plugin. Provide span and trace information to the snapshot. """ + def create_span(self, name: str) -> Optional['Span']: + """ + Create and return a new span. + + :param name: the name of the span to create + :return: the created span + """ + span = trace.get_tracer("deep").start_as_current_span(name, end_on_exit=False, attributes={'dynamic': 'deep'}) + if span: + # noinspection PyUnresolvedReferences + # this is a generator contextlib._GeneratorContextManager + current = span.__enter__() + if isinstance(current, _Span): + return _OtelSpan(current) + return None + + def current_span(self) -> Optional['Span']: + """ + Get the current span from the underlying provider. + + :return: the current span + """ + span = self.__get_span() + if span: + return _OtelSpan(span) + return None + def resource(self) -> Optional[Resource]: """ Provide resource. @@ -59,42 +140,18 @@ def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]: :return: the additional attributes to attach """ - span = OTelPlugin.__get_span() + span = self.current_span() if span is not None: return BoundedAttributes(attributes={ - "span_name": OTelPlugin.__span_name(span), - "trace_id": OTelPlugin.__trace_id(span), - "span_id": OTelPlugin.__span_id(span) + "span_name": span.name, + "trace_id": span.trace_id, + "span_id": span.span_id }) return None @staticmethod - def __span_name(span): - # type: (_Span)-> Optional[str] - return span.name if span.name else None - - @staticmethod - def __span_id(span): - # type: (_Span)-> Optional[str] - return (OTelPlugin.__format_span_id(span.context.span_id)) if span else None - - @staticmethod - def __trace_id(span): - # type: (_Span)-> Optional[str] - return (OTelPlugin.__format_trace_id(span.context.trace_id)) if span else None - - @staticmethod - def __get_span(): - # type: () -> Optional[_Span] + def __get_span() -> Optional[_Span]: span = trace.get_current_span() if isinstance(span, _Span): return span return None - - @staticmethod - def __format_span_id(_id): - return format(_id, "016x") - - @staticmethod - def __format_trace_id(_id): - return format(_id, "032x") diff --git a/src/deep/api/plugin/span/__init__.py b/src/deep/api/plugin/span/__init__.py new file mode 100644 index 0000000..4d68be8 --- /dev/null +++ b/src/deep/api/plugin/span/__init__.py @@ -0,0 +1,85 @@ +# Copyright (C) 2024 Intergral GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Definition of span processor. + +Span processor gives the ability to generate spans dynamically. +""" + +import abc +from typing import Optional + +from deep.api.plugin import Plugin + + +class SpanProcessor(Plugin, abc.ABC): + """Span processor connects Deep to a span provider.""" + + @abc.abstractmethod + def create_span(self, name: str) -> Optional['Span']: + """ + Create and return a new span. + + :param name: the name of the span to create + :return: the created span + """ + pass + + @abc.abstractmethod + def current_span(self) -> Optional['Span']: + """ + Get the current span from the underlying provider. + + :return: the current span + """ + pass + + +class Span: + """Internal type to wrap spans.""" + + @property + @abc.abstractmethod + def name(self): + """Get the span name.""" + pass + + @property + @abc.abstractmethod + def trace_id(self): + """Get the trace id.""" + pass + + @property + @abc.abstractmethod + def span_id(self): + """Get the span id.""" + pass + + @abc.abstractmethod + def add_attribute(self, key: str, value: str): + """ + Add an attribute to the span. + + :param key: the attribute key + :param value: the attribute value + """ + pass + + @abc.abstractmethod + def close(self): + """Close the span.""" + pass diff --git a/src/deep/api/tracepoint/tracepoint_config.py b/src/deep/api/tracepoint/tracepoint_config.py index 10910b8..1ea190c 100644 --- a/src/deep/api/tracepoint/tracepoint_config.py +++ b/src/deep/api/tracepoint/tracepoint_config.py @@ -213,6 +213,9 @@ def path(self): @property def line_no(self): """The tracepoint line number.""" + # todo need to support missing line number in grpc + if self._line_no < 0: + return 0 return self._line_no @property diff --git a/src/deep/api/tracepoint/trigger.py b/src/deep/api/tracepoint/trigger.py index a01d468..bcc2b50 100644 --- a/src/deep/api/tracepoint/trigger.py +++ b/src/deep/api/tracepoint/trigger.py @@ -29,13 +29,15 @@ """Handlers for triggers and action configs.""" import abc +import inspect from enum import Enum +from types import FrameType from typing import Optional, Dict, List from deep.api.tracepoint.constants import WINDOW_START, WINDOW_END, FIRE_COUNT, FIRE_PERIOD, LOG_MSG, WATCHES, \ LINE_START, METHOD_START, METHOD_END, LINE_END, LINE_CAPTURE, METHOD_CAPTURE, NO_COLLECT, SNAPSHOT, CONDITION, \ - FRAME_TYPE, STACK_TYPE, SINGLE_FRAME_TYPE, STACK, SPAN, STAGE, METHOD_NAME, LINE_STAGES, METHOD_STAGES + FRAME_TYPE, STACK_TYPE, SINGLE_FRAME_TYPE, STACK, SPAN, STAGE, METHOD_NAME, LINE_STAGES, METHOD_STAGES, METHOD from deep.api.tracepoint.tracepoint_config import TracepointWindow, TracepointExecutionStats, MetricDefinition, \ TracePointConfig @@ -82,7 +84,7 @@ def __init__(self, tp_id: str, condition: Optional[str], config: Dict[str, any], self.__window = TracepointWindow(self.__config.get(WINDOW_START, 0), self.__config.get(WINDOW_END, 0)) self.__stats = TracepointExecutionStats() self.__action_type = action_type - self.__location: 'Location | None' = None + self.__location: Optional['Location'] = None @property def id(self) -> str: @@ -134,6 +136,11 @@ def action_type(self) -> ActionType: """Get the action type.""" return self.__action_type + @property + def location(self) -> Optional['Location']: + """Get the location config.""" + return self.__location + @property def tracepoint(self) -> TracePointConfig: """Get the tracepoint config for this trigger.""" @@ -256,14 +263,15 @@ def __init__(self, position: Position = None): self.position = position @abc.abstractmethod - def at_location(self, event: str, file: str, line: int, method: str) -> bool: + def at_location(self, event: str, file: str, line: int, function_name: str, frame: FrameType) -> bool: """ Check if we are at the location defined by this location. :param event: the trigger event :param file: the file path :param line: the line number - :param method: the method name + :param function_name: the function name + :param frame: the triggering frame object :return: True, if we are at this location we expect, else False. """ pass @@ -286,32 +294,51 @@ def line(self) -> int: """The line number.""" pass + @property + @abc.abstractmethod + def name(self) -> str: + """ + The name for this location. + + For Method locations should be method name. + For line location should be the file#line number. + """ + pass + class Trigger(Location): - """A trigger is a location with action.""" + """ + A trigger is a location with actions. + + A trigger describes the location at which deep should take some actions. + + A location is the combination of the file, line, function name, and position. Combining these we can add + actions at the start or end of lines, or functions. + """ def __init__(self, location: Location, actions: List[LocationAction]): """ Create new trigger. - :param location: the underlying location + :param location: the location :param actions: the actions """ super().__init__() self.__location = location self.__actions = actions - def at_location(self, event: str, file: str, line: int, method: str) -> bool: + def at_location(self, event: str, file: str, line: int, function_name: str, frame: FrameType) -> bool: """ Check if we are at the location defined by this location. :param event: the trigger event :param file: the file path :param line: the line number - :param method: the method name + :param function_name: the method name + :param frame: the triggering frame object :return: True, if we are at this location we expect, else False. """ - return self.__location.at_location(event, file, line, method) + return self.__location.at_location(event, file, line, function_name, frame) @property def actions(self) -> List[LocationAction]: @@ -333,6 +360,16 @@ def line(self): """The line number.""" return self.__location.line + @property + def name(self) -> str: + """ + The name for this location. + + For Method locations should be method name. + For line location should be the file#line number. + """ + return self.__location.name + def __str__(self): """Represent this as a string.""" return str({ @@ -370,14 +407,17 @@ def __init__(self, path: str, line: int, position: Location.Position): self.__path = path self.__line = line - def at_location(self, event: str, file: str, line: int, method: str): + def at_location(self, event: str, file: str, line: int, function_name: str, frame: FrameType): """ Check if we are at the location defined by this location. + Line actions must always trigger on the line they define. So we do not look at the position here. + :param event: the trigger event :param file: the file path :param line: the line number - :param method: the method name + :param function_name: the method name + :param frame: the triggering frame object :return: True, if we are at this location we expect, else False. """ if event == "line" and file == self.path and line == self.line: @@ -399,6 +439,15 @@ def line(self): """The line number.""" return self.__line + @property + def name(self) -> str: + """ + The name for this location. + + For line location should be the file#line number. + """ + return f'{self.path}#{self.line}' + def __str__(self): """Represent this as a string.""" return str(self.__dict__) @@ -414,39 +463,55 @@ def __eq__(self, __value): return False -class MethodLocation(Location): +class FunctionLocation(Location): """A location for a method entry/exit/capture point.""" - def __init__(self, path: str, method: str, position: Location.Position): + def __init__(self, path: str, function_name: Optional[str], position: Location.Position): """ Create a new method location. :param path: the source file path - :param method: the method name + :param function_name: the function name :param position: the position """ super().__init__(position) - self.method = method + self.__function_name = function_name self.__path = path - def at_location(self, event: str, file: str, line: int, method: str): + def at_location(self, event: str, file: str, line: int, function_name: str, frame: FrameType): """ Check if we are at the location defined by this location. :param event: the trigger event :param file: the file path :param line: the line number - :param method: the method name + :param function_name: the method name + :param frame: the triggering frame object :return: True, if we are at this location we expect, else False. """ - if event == "CALL" and method == self.method and file == self.path: + if file != self.path: + return False + + # if method_name is not set then we need to discover it from the frame. + if self.__function_name is None: + # load source lines + lines, start = inspect.getsourcelines(frame) + end = start + len(lines) + # if the targeted line is in the range of start to end + if start <= line >= end: + # set the method to this name (so we do not need to look it up again) + self.__function_name = function_name + return True + return False + + if event == "call" and function_name == self.__function_name: return True return False @property def id(self): """The location id.""" - return "%s#%s" % (self.path, self.method) + return "%s#%s" % (self.path, self.__function_name) @property def path(self): @@ -458,6 +523,15 @@ def line(self): """The method location always has a line of -1.""" return -1 + @property + def name(self) -> str: + """ + The name for this location. + + For Method locations should be method name. + """ + return self.__function_name + def __str__(self): """Represent this as a string.""" return str(self.__dict__) @@ -468,7 +542,7 @@ def __repr__(self): def __eq__(self, __value): """Check if this is equal to another.""" - if self.path == __value.path and self.method == __value.method: + if self.path == __value.path and self.__function_name == __value.__function_name: return True return False @@ -554,13 +628,13 @@ def build_span_action(tp_id: str, args: Dict[str, str]) -> Optional[LocationActi SPAN: args[SPAN], FIRE_COUNT: args.get(FIRE_COUNT, '1'), FIRE_PERIOD: args.get(FIRE_PERIOD, '1000'), - }, LocationAction.ActionType.Snapshot) + }, LocationAction.ActionType.Span) def build_trigger(tp_id: str, path: str, line_no: int, args: Dict[str, str], watches: List[str], metrics: List[MetricDefinition]) -> Optional[Trigger]: """ - Buidl a trigger definition. + Build a trigger definition. :param tp_id: the tracepoint id :param path: the source file path @@ -571,13 +645,18 @@ def build_trigger(tp_id: str, path: str, line_no: int, args: Dict[str, str], wat :return: the trigger with the actions. """ stage_ = METHOD_START if METHOD_NAME in args else LINE_START + + if SPAN in args and args[SPAN] == METHOD: + stage_ = METHOD_START + if STAGE in args: stage_ = args[STAGE] + position = Location.Position.from_stage(stage_) if stage_ in LINE_STAGES: - location = LineLocation(path, line_no, Location.Position.from_stage(stage_)) + location = LineLocation(path, line_no, position) elif stage_ in METHOD_STAGES: - location = MethodLocation(path, args[METHOD_NAME], Location.Position.from_stage(stage_)) + location = FunctionLocation(path, args.get(METHOD_NAME, None), position) else: return None diff --git a/src/deep/config/config_service.py b/src/deep/config/config_service.py index b53fe2b..a26c6b9 100644 --- a/src/deep/config/config_service.py +++ b/src/deep/config/config_service.py @@ -21,6 +21,7 @@ from deep import logging from deep.api.plugin import Plugin, ResourceProvider, PLUGIN_TYPE, SnapshotDecorator, TracepointLogger from deep.api.plugin.metric import MetricProcessor +from deep.api.plugin.span import SpanProcessor from deep.api.resource import Resource from deep.config.tracepoint_config import TracepointConfigService, ConfigUpdateListener @@ -152,6 +153,16 @@ def has_metric_processor(self) -> bool: """Is there a configured metric processor.""" return self._find_plugin(MetricProcessor) is not None + @property + def span_processors(self) -> Generator[SpanProcessor, None, None]: + """Generator for snapshot decorators.""" + return self.__plugin_generator(SpanProcessor) + + @property + def has_span_processor(self) -> bool: + """Is there a configured metric processor.""" + return self._find_plugin(SpanProcessor) is not None + def is_app_frame(self, filename: str) -> Tuple[bool, Optional[str]]: """ Check if the current frame is a user application frame. diff --git a/src/deep/processor/context/action_context.py b/src/deep/processor/context/action_context.py index 64c3b0d..fcecce8 100644 --- a/src/deep/processor/context/action_context.py +++ b/src/deep/processor/context/action_context.py @@ -52,7 +52,7 @@ def __init__(self, parent: 'TriggerContext', action: 'LocationAction'): :param parent: the parent trigger :param action: the action config """ - self.tigger_context: 'TriggerContext' = parent + self.trigger_context: 'TriggerContext' = parent self.location_action: 'LocationAction' = action self._triggered = False @@ -63,7 +63,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, exception_traceback): """Exit and close the context.""" if self.has_triggered(): - self.location_action.record_triggered(self.tigger_context.ts) + self.location_action.record_triggered(self.trigger_context.ts) def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]: """ @@ -72,10 +72,10 @@ def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str] :param watch: The watch expression to evaluate. :return: Tuple with WatchResult, collected variables, and the log string for the expression """ - var_processor = VariableSetProcessor({}, self.tigger_context.var_cache) + var_processor = VariableSetProcessor({}, self.trigger_context.var_cache) try: - result = self.tigger_context.evaluate_expression(watch) + result = self.trigger_context.evaluate_expression(watch) variable_id, log_str = var_processor.process_variable(watch, result) return WatchResult(watch, variable_id), var_processor.var_lookup, log_str @@ -109,21 +109,14 @@ def can_trigger(self) -> bool: Combine checks for rate limits, windows and condition. :return: True, if the trigger can be triggered. """ - if not self.location_action.can_trigger(self.tigger_context.ts): + if not self.location_action.can_trigger(self.trigger_context.ts): return False if self.location_action.condition is None or len(self.location_action.condition.strip()) == 0: return True - result = self.tigger_context.evaluate_expression(self.location_action.condition) + result = self.trigger_context.evaluate_expression(self.location_action.condition) return str2bool(str(result)) -class SpanActionContext(ActionContext): - """Action for spans.""" - - def _process_action(self): - pass - - class NoActionContext(ActionContext): """Default context if no action can be determined.""" diff --git a/src/deep/processor/context/action_results.py b/src/deep/processor/context/action_results.py index 5a0b6a0..d4e2c0e 100644 --- a/src/deep/processor/context/action_results.py +++ b/src/deep/processor/context/action_results.py @@ -42,6 +42,7 @@ """Handler results of actions.""" import abc +from types import FrameType from typing import Optional, TYPE_CHECKING if TYPE_CHECKING: @@ -51,12 +52,14 @@ class ActionCallback: """A call back to 'close' an action.""" - def process(self, frame, event) -> bool: + @abc.abstractmethod + def process(self, event: str, frame: FrameType, arg: any) -> bool: """ Process a callback. - :param frame: the frame data :param event: the event + :param frame: the frame data + :param arg: the arg from settrace :return: True, to keep this callback until next match. """ pass diff --git a/src/deep/processor/context/callback_context.py b/src/deep/processor/context/callback_context.py new file mode 100644 index 0000000..a0f9e3f --- /dev/null +++ b/src/deep/processor/context/callback_context.py @@ -0,0 +1,132 @@ +# Copyright (C) 2024 Intergral GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Handling for action callbacks.""" +from types import FrameType +from typing import List + +from deep.api.tracepoint.trigger import Location + +from deep.processor.context.action_results import ActionCallback + + +class CallbackContext(Location, ActionCallback): + """ + Callback Context deals with ensuring we close any pending actions created by TriggerHandler. + + If a span is created on a line or method, then we attach a callback that will trigger the span + to close when the line/method completes. + """ + + def __init__(self, event: str, filename: str, line: int, name: str, callbacks: List['ActionCallback']): + """Create new callback context.""" + super().__init__(Location.Position.END) + self.__event = event + self.__filename = filename + self.__function_name = name + self.__line = line + self.__callbacks = callbacks + + def at_location(self, event: str, file: str, line: int, function_name: str, frame: FrameType) -> bool: + """ + Check if we are at the location defined by this location. + + :param event: the trigger event + :param file: the file path + :param line: the line number + :param function_name: the function name + :param frame: the triggering frame object + :return: True, if we are at this location we expect, else False. + """ + # if either the file or function name are different, then we are not in the correct place. + if file != self.__filename or function_name != self.__function_name: + return False + + if self.__event == 'line': + return self.__check_at_next_line(event, file, function_name) + else: + return self.__check_at_method_end(event) + + def process(self, event: str, frame: FrameType, arg: any): + """ + Process all callbacks. + + :param event: the event + :param frame: the frame data + :param arg: the arg from settrace + :return: True, to keep this callback until next match. + """ + for callback in self.__callbacks: + callback.process(event, frame, arg) + + @property + def id(self) -> str: + """The location id.""" + return "%s#%s" % (self.path, self.name) + + @property + def path(self) -> str: + """The source file path.""" + return self.__filename + + @property + def line(self) -> int: + """The source line number.""" + return self.__line + + @property + def name(self) -> str: + """The function name.""" + return self.__function_name + + def __check_at_next_line(self, event: str, file: str, function_name: str) -> bool: + """ + Check if the new position is the next line after that which triggered this line event. + + When a 'line' event triggers a callback, then we are trying to track a line execution. This means we should + trigger the callback when we are at the next logical line. This would be either the next 'line' event in the + same file/function, or the 'exception' or 'return' event for the file/function that triggered the callback. + + :param event: the current event + :param file: the current file + :param function_name: the current function + :return: True, if we are the next logical line of code. + """ + # if we are at a new line event, then we want to check the file/line number + if event == 'line': + # if we are the same file and function then we are the next line + if file == self.__filename and function_name == self.__function_name: + return True + # if we are line, but not the same file and function then we are not in the correct place + return False + + # If we are not line, then we have to be the return or error from the method we started in + return True + + def __check_at_method_end(self, event: str) -> bool: + """ + Check if the new position is the end of the method we are wrapping. + + When a 'call' event triggers a callback, then we are trying to wrap a method execution. This means we should + trigger the callback when the method ends. this is when the event 'exception' or 'return' is seen with the + same file and function name. + + :param event: the current event + :return: True, if we are the next logical line of code. + + """ + if event in ['exception', 'return']: + return True + return False diff --git a/src/deep/processor/context/log_action.py b/src/deep/processor/context/log_action.py index 847b1de..a17d48f 100644 --- a/src/deep/processor/context/log_action.py +++ b/src/deep/processor/context/log_action.py @@ -48,7 +48,7 @@ class LogActionContext(ActionContext): def _process_action(self): log_msg = self.location_action.config.get(LOG_MSG) log, watches, vars_ = self.process_log(log_msg) - self.tigger_context.attach_result(LogActionResult(self.location_action, log)) + self.trigger_context.attach_result(LogActionResult(self.location_action, log)) def process_log(self, log_msg) -> Tuple[str, List['WatchResult'], Dict[str, 'Variable']]: """ @@ -90,7 +90,7 @@ def get_field(self, field_name, args, kwargs): return log_str, field_name - log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self.tigger_context.locals)) + log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self.trigger_context.locals)) return log_msg, watch_results, _var_lookup diff --git a/src/deep/processor/context/metric_action.py b/src/deep/processor/context/metric_action.py index 745779e..2c5cad2 100644 --- a/src/deep/processor/context/metric_action.py +++ b/src/deep/processor/context/metric_action.py @@ -40,12 +40,12 @@ def _process_action(self): metrics = self._metrics() for metric in metrics: labels, value = self._process_metric(metric) - for processor in self.tigger_context.config.metric_processors: + for processor in self.trigger_context.config.metric_processors: getattr(processor, self._convert_type(metric.type))(metric.name, labels, metric.namespace or "deep", metric.help, metric.unit, value) def __has_metric_processor(self): - return self.tigger_context.config.has_metric_processor + return self.trigger_context.config.has_metric_processor def _metrics(self) -> List[MetricDefinition]: return self.location_action.config['metrics'] @@ -57,7 +57,7 @@ def _process_metric(self, metric: MetricDefinition) -> Tuple[Dict[str, str], flo metric_value = 1 if metric.expression: try: - metric_value = float(self.tigger_context.evaluate_expression(metric.expression)) + metric_value = float(self.trigger_context.evaluate_expression(metric.expression)) except Exception: deep.logging.exception("Cannot process metric expression %s", metric.expression) @@ -67,7 +67,7 @@ def _process_metric(self, metric: MetricDefinition) -> Tuple[Dict[str, str], flo key = label.key if label.expression: try: - value = str(self.tigger_context.evaluate_expression(label.expression)) + value = str(self.trigger_context.evaluate_expression(label.expression)) except Exception: deep.logging.exception("Cannot process metric label expression %s: %s", key, label.expression) value = 'expression failed' diff --git a/src/deep/processor/context/snapshot_action.py b/src/deep/processor/context/snapshot_action.py index faf9b1f..eef47a3 100644 --- a/src/deep/processor/context/snapshot_action.py +++ b/src/deep/processor/context/snapshot_action.py @@ -68,7 +68,7 @@ def collection_config(self) -> VariableProcessorConfig: @property def ts(self) -> int: """The timestamp in nanoseconds for this trigger.""" - return self.tigger_context.ts + return self.trigger_context.ts def should_collect_vars(self, current_frame_index: int) -> bool: """ @@ -93,7 +93,7 @@ def is_app_frame(self, filename: str) -> Tuple[bool, str]: :param filename: the frame file name :return: True if add frame, else False """ - return self.tigger_context.config.is_app_frame(filename) + return self.trigger_context.config.is_app_frame(filename) @property def watches(self): @@ -106,12 +106,12 @@ def log_msg(self): return self.location_action.config.get(LOG_MSG, None) def _process_action(self): - collector = FrameCollector(self, self.tigger_context.frame) + collector = FrameCollector(self, self.trigger_context.frame) - frames, variables = collector.collect(self.tigger_context.vars, self.tigger_context.var_cache) + frames, variables = collector.collect(self.trigger_context.vars, self.trigger_context.var_cache) - snapshot = EventSnapshot(self.location_action.tracepoint, self.tigger_context.ts, self.tigger_context.resource, - frames, variables) + snapshot = EventSnapshot(self.location_action.tracepoint, self.trigger_context.ts, + self.trigger_context.resource, frames, variables) # process the snapshot watches for watch in self.watches: @@ -122,7 +122,7 @@ def _process_action(self): log_msg = self.log_msg if log_msg is not None: # create and process the log message - context = LogActionContext(self.tigger_context, LocationAction(self.location_action.id, None, { + context = LogActionContext(self.trigger_context, LocationAction(self.location_action.id, None, { LOG_MSG: log_msg, }, LocationAction.ActionType.Log)) log, watches, log_vars = context.process_log(log_msg) @@ -130,9 +130,9 @@ def _process_action(self): for watch in watches: snapshot.add_watch_result(watch) snapshot.merge_var_lookup(log_vars) - self.tigger_context.attach_result(LogActionResult(context.location_action, log)) + self.trigger_context.attach_result(LogActionResult(context.location_action, log)) - self.tigger_context.attach_result(SendSnapshotActionResult(self, snapshot)) + self.trigger_context.attach_result(SendSnapshotActionResult(self, snapshot)) class SendSnapshotActionResult(ActionResult): @@ -156,14 +156,14 @@ def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]: :return: an action callback if we need to do something at the 'end', or None """ - attributes = BoundedAttributes(attributes={'ctx_id': ctx.id}) + attributes = BoundedAttributes(attributes={'ctx_id': ctx.id}, immutable=False) for decorator in ctx.config.snapshot_decorators: try: decorate = decorator.decorate(self.action_context) if decorate is not None: - attributes = attributes.merge_in(decorate) + attributes.merge_in(decorate) except Exception: - deep.logging.exception("Failed to decorate snapshot: %s", decorator) + deep.logging.exception("Failed to decorate snapshot: %s ", decorator) self.snapshot.attributes.merge_in(attributes) ctx.push_service.push_snapshot(self.snapshot) diff --git a/src/deep/processor/context/span_action.py b/src/deep/processor/context/span_action.py new file mode 100644 index 0000000..189859f --- /dev/null +++ b/src/deep/processor/context/span_action.py @@ -0,0 +1,99 @@ +# Copyright (C) 2024 Intergral GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Handling for span actions.""" +from types import FrameType +from typing import Optional, TYPE_CHECKING + +from deep.processor.context.action_context import ActionContext +from deep.processor.context.action_results import ActionResult, ActionCallback + +if TYPE_CHECKING: + from deep.processor.context.trigger_context import TriggerContext + + +class SpanActionCallback(ActionCallback): + """Action callback to close created spans.""" + + def __init__(self, spans): + """Create callback.""" + self.__spans = spans + + def process(self, event: str, frame: FrameType, arg: any) -> bool: + """ + Process a callback. + + :param event: the event + :param frame: the frame data + :param arg: the arg from settrace + :return: True, to keep this callback until next match. + """ + for span in self.__spans: + span.close() + return False + + +class SpanResult(ActionResult): + """Action result to map to callback.""" + + def __init__(self, spans): + """Create result.""" + self.__spans = spans + + def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]: + """ + Process this result. + + :param ctx: the triggering context + + :return: an action callback if we need to do something at the 'end', or None + """ + return SpanActionCallback(self.__spans) + + +class SpanActionContext(ActionContext): + """Action for spans.""" + + def can_trigger(self) -> bool: + """ + Check if the action can trigger. + + If we do not have a span processor enabled, then skip this action. + :return: True, if the trigger can be triggered. + """ + if self.trigger_context.config.has_span_processor: + return super().can_trigger() + return False + + def _process_action(self): + name = self._span_name + if name is None: + return + + spans = [] + + for span_processor in self.trigger_context.config.span_processors: + span = span_processor.create_span(name) + if span: + spans.append(span) + + if len(spans) > 0: + self.trigger_context.attach_result(SpanResult(spans)) + + @property + def _span_name(self): + location = self.location_action.location + if location: + return location.name diff --git a/src/deep/processor/context/trigger_context.py b/src/deep/processor/context/trigger_context.py index 2118c32..8930784 100644 --- a/src/deep/processor/context/trigger_context.py +++ b/src/deep/processor/context/trigger_context.py @@ -24,11 +24,12 @@ from deep.api.tracepoint import Variable from deep.api.tracepoint.trigger import LocationAction from deep.config import ConfigService -from deep.processor.context.action_context import SpanActionContext, NoActionContext, ActionContext +from deep.processor.context.action_context import NoActionContext, ActionContext from deep.processor.context.action_results import ActionResult, ActionCallback from deep.processor.context.log_action import LogActionContext from deep.processor.context.metric_action import MetricActionContext from deep.processor.context.snapshot_action import SnapshotActionContext +from deep.processor.context.span_action import SpanActionContext from deep.processor.frame_collector import FrameCollector from deep.processor.variable_set_processor import VariableCacheProvider from deep.push import PushService diff --git a/src/deep/processor/trigger_handler.py b/src/deep/processor/trigger_handler.py index 34ba5a5..0198850 100644 --- a/src/deep/processor/trigger_handler.py +++ b/src/deep/processor/trigger_handler.py @@ -33,7 +33,7 @@ from deep.api.tracepoint.trigger import Trigger from deep.config import ConfigService from deep.config.tracepoint_config import ConfigUpdateListener -from deep.processor.context.action_results import ActionCallback +from deep.processor.context.callback_context import CallbackContext from deep.processor.context.trigger_context import TriggerContext from deep.push import PushService from deep.thread_local import ThreadLocal @@ -79,8 +79,6 @@ class TriggerHandler: This is where we 'listen' for a hit, and determine if we should collect data. """ - __callbacks: ThreadLocal[Deque[List[ActionCallback]]] = ThreadLocal(lambda: deque()) - def __init__(self, config: ConfigService, push_service: PushService): """ Create a new tigger handler. @@ -94,6 +92,7 @@ def __init__(self, config: ConfigService, push_service: PushService): self._tp_config: List[Trigger] = [] self._config = config self._config.add_listener(TracepointHandlerUpdateListener(self)) + self._callbacks: ThreadLocal[Deque[CallbackContext]] = ThreadLocal(lambda: deque()) def start(self): """Start the trigger handler.""" @@ -136,15 +135,15 @@ def trace_call(self, frame: FrameType, event: str, arg): :param arg: the args :return: None to ignore other calls, or our self to continue """ - if event in ["line", "return", "exception"] and self.__callbacks.is_set: - self.__process_call_backs(frame, event) + event, file, line, function = self.location_from_event(event, frame) + if event in ["line", "return", "exception"] and self._callbacks.is_set: + self.__process_call_backs(arg, frame, event, file, line, function) # return if we do not have any tracepoints if len(self._tp_config) == 0: return None - event, file, line, function = self.location_from_event(event, frame) - actions = self.__actions_for_location(event, file, line, function) + actions = self.__actions_for_location(event, file, line, function, frame) if len(actions) == 0: return self.trace_call @@ -162,25 +161,36 @@ def trace_call(self, frame: FrameType, event: str, arg): except BaseException: logging.exception("Cannot trigger at %s#%s %s", file, line, function) - self.__callbacks.get().append(trigger_context.callbacks) + callbacks = trigger_context.callbacks + if len(callbacks) > 0: + logging.debug("Callbacks registered: %s", callbacks) + self._callbacks.get().append( + CallbackContext(event, file, line, function, callbacks)) return self.trace_call - def __actions_for_location(self, event, file, line, function): + def __actions_for_location(self, event, file, line, function, frame): actions = [] for trigger in self._tp_config: - if trigger.at_location(event, file, line, function): + if trigger.at_location(event, file, line, function, frame): actions += trigger.actions return actions - def __process_call_backs(self, frame: FrameType, event: str): - callbacks = self.__callbacks.value.pop() - remaining: List[ActionCallback] = [] - for callback in callbacks: - if callback.process(frame, event): - remaining.append(callback) - - self.__callbacks.value.append(remaining) + def __process_call_backs(self, arg: any, frame: FrameType, event: str, file: str, line: int, function_name: str): + # remove top context + context: CallbackContext = self._callbacks.value.pop() + # if it is for our location process it + if context.at_location(event, file, line, function_name, frame): + logging.debug("At callback location %s", context.name) + context.process(event, frame, arg) + else: + logging.debug("Not at callback location %s", context.name) + # else put the context back on the queue + self._callbacks.value.append(context) + + if len(self._callbacks.value) == 0: + logging.debug("Callbacks cleared.") + self._callbacks.clear() @staticmethod def location_from_event(event: str, frame: FrameType) -> Tuple[str, str, int, Optional[str]]: diff --git a/src/deep/thread_local.py b/src/deep/thread_local.py index fe7981d..2180f62 100644 --- a/src/deep/thread_local.py +++ b/src/deep/thread_local.py @@ -42,7 +42,11 @@ def get(self) -> T: :return: the stored value, or the value from the default_provider """ current_thread = threading.current_thread() - return self.__store.get(current_thread.ident, self.__default_provider()) + get = self.__store.get(current_thread.ident, None) + if get is None: + get = self.__default_provider() + self.__store[current_thread.ident] = get + return get def set(self, val: T): """ @@ -56,8 +60,8 @@ def set(self, val: T): def clear(self): """Remove the value for this thread.""" current_thread = threading.current_thread() - if current_thread in self.__store: - del self.__store[current_thread] + if current_thread.ident in self.__store: + del self.__store[current_thread.ident] @property def is_set(self): @@ -67,7 +71,7 @@ def is_set(self): :return: True if there is a value for this thread """ current_thread = threading.current_thread() - return current_thread in self.__store + return current_thread.ident in self.__store @property def value(self): diff --git a/tests/it_tests/test_target.py b/tests/it_tests/test_target.py index ee01edf..a4458aa 100644 --- a/tests/it_tests/test_target.py +++ b/tests/it_tests/test_target.py @@ -74,4 +74,12 @@ def some_func_with_body(self, some_arg): name = self.__name new_name = name + some_arg i = random.randint(3, 9) - return i + new_name + return str(i) + new_name + + def some_func_with_body_long(self, some_arg): + name = self.__name + new_name = name + some_arg + i = random.randint(3, 9) + self.finally_something(some_arg) + self.throw_something(some_arg) + return str(i) + new_name diff --git a/tests/unit_tests/processor/test_trigger_handler.py b/tests/unit_tests/processor/test_trigger_handler.py index 9bd8c77..7d539c0 100644 --- a/tests/unit_tests/processor/test_trigger_handler.py +++ b/tests/unit_tests/processor/test_trigger_handler.py @@ -36,12 +36,13 @@ from deep import logging from deep.api.plugin import TracepointLogger from deep.api.plugin.metric import MetricProcessor +from deep.api.plugin.span import SpanProcessor from deep.api.resource import Resource from deep.api.tracepoint.constants import LOG_MSG, WATCHES from deep.api.tracepoint.eventsnapshot import EventSnapshot from deep.api.tracepoint.tracepoint_config import MetricDefinition -from deep.api.tracepoint.trigger import Location, LocationAction, LineLocation, Trigger +from deep.api.tracepoint.trigger import Location, LocationAction, LineLocation, Trigger, FunctionLocation from deep.config import ConfigService from deep.processor.trigger_handler import TriggerHandler from deep.push.push_service import PushService @@ -60,6 +61,7 @@ def push_snapshot(self, snapshot: EventSnapshot): class MockTracepointLogger(TracepointLogger): def __init__(self): + super().__init__() self.logged = [] def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str): @@ -90,7 +92,7 @@ def __init__(self): def capture_trace_call(self, location: Location): def trace_call(frame, event, args): event, file, line, function = TriggerHandler.location_from_event(event, frame) - if location.at_location(event, file, line, function): + if location.at_location(event, file, line, function, frame): self.captured_frame = frame self.captured_event = event self.captured_args = args @@ -228,3 +230,39 @@ def test_metric_action(self): self.assertEqual(0, len(pushed)) mockito.verify(mock_plugin, mockito.times(1)).counter("simple_test", {}, 'deep', None, None, 1) + + def test_span_action(self): + capture = TraceCallCapture() + config = MockConfigService({}) + mock_plugin = mockito.mock(spec=SpanProcessor) + mock_span = mockito.mock() + mockito.when(mock_plugin).create_span('some_test_function').thenReturn(mock_span) + config.plugins = [mock_plugin] + push = MockPushService(None, None) + handler = TriggerHandler(config, push) + + location = FunctionLocation('test_target.py', "some_test_function", Location.Position.START) + handler.new_config([Trigger(location, [ + LocationAction("tp_id", "", {}, + LocationAction.ActionType.Span)])]) + + self.call_and_capture(location, some_test_function, ['input'], capture) + + handler.trace_call(capture.captured_frame, capture.captured_event, capture.captured_args) + + # now extract the callback value + pop = handler._callbacks.value + # capture the real data that would be sent when we match this location + self.call_and_capture(pop[0], some_test_function, ['input'], capture) + + # now call our trace call to check our callbacks + handler.trace_call(capture.captured_frame, capture.captured_event, capture.captured_args) + + logged = config.logger.logged + self.assertEqual(0, len(logged)) + pushed = push.pushed + self.assertEqual(0, len(pushed)) + + mockito.verify(mock_plugin, mockito.times(1)).create_span("some_test_function") + + mockito.verify(mock_span, mockito.times(1)).close() diff --git a/tests/unit_tests/tracepoint/test_trigger.py b/tests/unit_tests/tracepoint/test_trigger.py index 8f4d030..09496e9 100644 --- a/tests/unit_tests/tracepoint/test_trigger.py +++ b/tests/unit_tests/tracepoint/test_trigger.py @@ -31,7 +31,7 @@ from parameterized import parameterized from deep.api.tracepoint.tracepoint_config import MetricDefinition -from deep.api.tracepoint.trigger import build_trigger, LineLocation, LocationAction, Trigger, Location +from deep.api.tracepoint.trigger import build_trigger, LineLocation, LocationAction, Trigger, Location, FunctionLocation class Test(TestCase): @@ -89,6 +89,43 @@ class Test(TestCase): 'fire_count': '1', 'fire_period': '1000', }, LocationAction.ActionType.Metric), + ])], + # should create span action + ["some.file", 123, {'span': 'line', 'snapshot': 'no_collect'}, [], [], + Trigger(LineLocation("some.file", 123, Location.Position.START), [ + LocationAction("tp-id", None, { + 'span': 'line', + 'fire_count': '1', + 'fire_period': '1000', + }, LocationAction.ActionType.Span), + ])], + # should create span action + ["some.file", 123, {'span': 'method', 'snapshot': 'no_collect'}, [], [], + Trigger(FunctionLocation("some.file", None, Location.Position.START), [ + LocationAction("tp-id", None, { + 'span': 'method', + 'fire_count': '1', + 'fire_period': '1000', + }, LocationAction.ActionType.Span), + ])], + # should create span action + ["some.file", -1, {'span': 'method', 'method_name': 'test_method', 'snapshot': 'no_collect'}, [], [], + Trigger(FunctionLocation("some.file", 'test_method', Location.Position.START), [ + LocationAction("tp-id", None, { + 'span': 'method', + 'fire_count': '1', + 'fire_period': '1000', + }, LocationAction.ActionType.Span), + ])], + # should create method close tracepoint + ["some.file", -1, + {'span': 'method', 'method_name': 'test_method', 'snapshot': 'no_collect', 'stage': 'method_end'}, [], [], + Trigger(FunctionLocation("some.file", 'test_method', Location.Position.END), [ + LocationAction("tp-id", None, { + 'span': 'method', + 'fire_count': '1', + 'fire_period': '1000', + }, LocationAction.ActionType.Span), ])] ]) def test_build_triggers(self, file, line, args, watches, metrics, expected):