Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# main (unreleased)

- **[FEATURE]**: feat(capture): add support for capture tracepoints [#34](https://github.com/intergral/deep/pull/34) [@Umaaz](https://github.com/Umaaz)

# 1.1.0 (06/02/2024)

- **[CHANGE]**: change(build): add doc string check to flake8 [#14](https://github.com/intergral/deep/pull/14) [@Umaaz](https://github.com/Umaaz)
Expand Down
20 changes: 19 additions & 1 deletion src/deep/api/tracepoint/eventsnapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,24 +391,37 @@ def __eq__(self, o) -> bool:
return o._vid == self._vid and o._name == self._name and o._modifiers == self._modifiers


WATCH_SOURCE_WATCH = "WATCH"
"""Watch source for user watch statements."""
WATCH_SOURCE_LOG = "LOG"
"""Watch source for log expressions."""
WATCH_SOURCE_METRIC = "METRIC"
"""Watch source for metric expressions."""
WATCH_SOURCE_CAPTURE = "CAPTURE"
"""Watch source for captured data."""


class WatchResult:
"""This is the result of a watch expression."""

def __init__(self,
source: str,
expression: str,
result: Optional['VariableId'],
error: Optional[str] = None
error: Optional[str] = None,
):
"""
Create new watch result.

:param source: the watch source
:param expression: the expression used
:param result: the result of the expression
:param error: the error captured during execution
"""
self._expression = expression
self._result = result
self._error = error
self.__source = source

@property
def expression(self) -> str:
Expand All @@ -424,3 +437,8 @@ def result(self) -> Optional['VariableId']:
def error(self) -> Optional[str]:
"""The error."""
return self._error

@property
def source(self):
"""The watch source."""
return self.__source
2 changes: 1 addition & 1 deletion src/deep/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def convert_label_expressions(label_expressions) -> List[LabelExpression]:


def __convert_metric_definition(metrics):
return [MetricDefinition(m.name, MetricType.Name(metrics[0].type), convert_label_expressions(m.labelExpressions),
return [MetricDefinition(m.name, MetricType.Name(m.type), convert_label_expressions(m.labelExpressions),
m.expression, m.namespace, m.help, m.unit) for m in metrics]


Expand Down
21 changes: 18 additions & 3 deletions src/deep/processor/context/action_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from typing import Tuple, TYPE_CHECKING, Dict

import deep.logging
from deep.api.tracepoint.eventsnapshot import WATCH_SOURCE_CAPTURE
from deep.logging import logging
from deep.api.tracepoint import WatchResult, Variable
from deep.processor.variable_set_processor import VariableSetProcessor
Expand Down Expand Up @@ -65,10 +66,11 @@ def __exit__(self, exception_type, exception_value, exception_traceback):
if self.has_triggered():
self.location_action.record_triggered(self.trigger_context.ts)

def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]:
def eval_watch(self, watch: str, source: str) -> Tuple[WatchResult, Dict[str, Variable], str]:
"""
Evaluate an expression in the current frame.

:param source: The watch source.
:param watch: The watch expression to evaluate.
:return: Tuple with WatchResult, collected variables, and the log string for the expression
"""
Expand All @@ -78,10 +80,23 @@ def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]
result = self.trigger_context.evaluate_expression(watch)
variable_id, log_str = var_processor.process_variable(watch, result)

return WatchResult(watch, variable_id), var_processor.var_lookup, log_str
return WatchResult(source, watch, variable_id), var_processor.var_lookup, log_str
except BaseException as e:
logging.exception("Error evaluating watch %s", watch)
return WatchResult(watch, None, str(e)), {}, str(e)
return WatchResult(source, watch, None, str(e)), {}, str(e)

def process_capture_variable(self, name: str, variable: any) -> Tuple[WatchResult, Dict[str, Variable], str]:
"""
Process a captured variable (exception or return), into a variable set.

:param name: the name to use (raised or returned)
:param variable: the value to process
:return: Tuple with WatchResult, collected variables, and the log string for the expression
"""
var_processor = VariableSetProcessor({}, self.trigger_context.var_cache)
variable_id, log_str = var_processor.process_variable(name, variable)

return WatchResult(WATCH_SOURCE_CAPTURE, name, variable_id), var_processor.var_lookup, log_str

def process(self):
"""Process the action."""
Expand Down
3 changes: 2 additions & 1 deletion src/deep/processor/context/action_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ class ActionCallback:
"""A call back to 'close' an action."""

@abc.abstractmethod
def process(self, event: str, frame: FrameType, arg: any) -> bool:
def process(self, ctx: 'TriggerContext', event: str, frame: FrameType, arg: any) -> bool:
"""
Process a callback.

:param ctx: the context for this trigger
:param event: the event
:param frame: the frame data
:param arg: the arg from settrace
Expand Down
6 changes: 4 additions & 2 deletions src/deep/processor/context/callback_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from deep.api.tracepoint.trigger import Location

from deep.processor.context.action_results import ActionCallback
from deep.processor.context.trigger_context import TriggerContext


class CallbackContext(Location, ActionCallback):
Expand Down Expand Up @@ -59,17 +60,18 @@ def at_location(self, event: str, file: str, line: int, function_name: str, fram
else:
return self.__check_at_method_end(event)

def process(self, event: str, frame: FrameType, arg: any):
def process(self, ctx: 'TriggerContext', event: str, frame: FrameType, arg: any):
"""
Process all callbacks.

:param ctx: the context for this trigger
:param event: the event
:param frame: the frame data
:param arg: the arg from settrace
:return: True, to keep this callback until next match.
"""
for callback in self.__callbacks:
callback.process(event, frame, arg)
callback.process(ctx, event, frame, arg)

@property
def id(self) -> str:
Expand Down
3 changes: 2 additions & 1 deletion src/deep/processor/context/log_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .action_context import ActionContext
from .action_results import ActionResult, ActionCallback
from ...api.tracepoint.constants import LOG_MSG
from ...api.tracepoint.eventsnapshot import WATCH_SOURCE_LOG
from ...api.tracepoint.trigger import LocationAction

from typing import Tuple
Expand Down Expand Up @@ -83,7 +84,7 @@ class FormatExtractor(string.Formatter):

def get_field(self, field_name, args, kwargs):
# evaluate watch
watch, var_lookup, log_str = ctx_self.eval_watch(field_name)
watch, var_lookup, log_str = ctx_self.eval_watch(field_name, WATCH_SOURCE_LOG)
# collect data
watch_results.append(watch)
_var_lookup.update(var_lookup)
Expand Down
92 changes: 84 additions & 8 deletions src/deep/processor/context/snapshot_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Handling for snapshot actions."""

from types import FrameType
from typing import Tuple, Optional, TYPE_CHECKING

import deep.logging
from deep.api.attributes import BoundedAttributes
from deep.api.tracepoint import EventSnapshot
from deep.api.tracepoint.constants import FRAME_TYPE, SINGLE_FRAME_TYPE, NO_FRAME_TYPE, ALL_FRAME_TYPE
from deep.api.tracepoint.constants import FRAME_TYPE, SINGLE_FRAME_TYPE, NO_FRAME_TYPE, ALL_FRAME_TYPE, STAGE, \
LINE_CAPTURE, METHOD_CAPTURE
from deep.api.tracepoint.eventsnapshot import WATCH_SOURCE_WATCH
from deep.api.tracepoint.trigger import LocationAction
from deep.processor.context.action_context import ActionContext
from deep.processor.context.action_results import ActionResult, ActionCallback
Expand Down Expand Up @@ -115,7 +117,7 @@ def _process_action(self):

# process the snapshot watches
for watch in self.watches:
result, watch_lookup, _ = self.eval_watch(watch)
result, watch_lookup, _ = self.eval_watch(watch, WATCH_SOURCE_WATCH)
snapshot.add_watch_result(result)
snapshot.merge_var_lookup(watch_lookup)

Expand All @@ -132,11 +134,25 @@ def _process_action(self):
snapshot.merge_var_lookup(log_vars)
self.trigger_context.attach_result(LogActionResult(context.location_action, log))

self.trigger_context.attach_result(SendSnapshotActionResult(self, snapshot))
if self.trigger_context.event in ['exception', 'return']:
watch, new_vars, _ = self.process_capture_variable(self.trigger_context.event, self.trigger_context.arg)
snapshot.add_watch_result(watch)
snapshot.merge_var_lookup(new_vars)

if self._is_deferred():
self.trigger_context.attach_result(DeferredSnapshotActionResult(self, snapshot))
else:
self.trigger_context.attach_result(SendSnapshotActionResult(self, snapshot))

class SendSnapshotActionResult(ActionResult):
"""The result of a successful snapshot action."""
def _is_deferred(self):
stage = self.location_action.config.get(STAGE, None)
if stage is None:
return False
return stage == LINE_CAPTURE or stage == METHOD_CAPTURE


class DeferredSnapshotActionResult(ActionResult):
"""The result of a deferred snapshot action."""

def __init__(self, action_context: ActionContext, snapshot: EventSnapshot):
"""
Expand All @@ -156,6 +172,10 @@ def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]:

:return: an action callback if we need to do something at the 'end', or None
"""
snapshot = self._decorate_snapshot(ctx)
return DeferredSnapshotActionCallback(self.action_context, snapshot)

def _decorate_snapshot(self, ctx):
attributes = BoundedAttributes(attributes={'ctx_id': ctx.id}, immutable=False)
for decorator in ctx.config.snapshot_decorators:
try:
Expand All @@ -164,7 +184,63 @@ def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]:
attributes.merge_in(decorate)
except Exception:
deep.logging.exception("Failed to decorate snapshot: %s ", decorator)

self.snapshot.attributes.merge_in(attributes)
ctx.push_service.push_snapshot(self.snapshot)
return self.snapshot


class DeferredSnapshotActionCallback(ActionCallback):
"""Defer the send action to the end of the line or function."""

def __init__(self, action_context: ActionContext, snapshot: EventSnapshot):
"""
Create a new action callback.

:param action_context: the triggering action context
:param snapshot: the generated snapshot
"""
self.__action_context = action_context
self.__snapshot = snapshot

def process(self, ctx: 'TriggerContext', event: str, frame: FrameType, arg: any) -> bool:
"""
Process a callback.

:param ctx: the context for this trigger
:param event: the event
:param frame: the frame data
:param arg: the arg from settrace
:return: True, to keep this callback until next match.
"""
if event in ['exception', 'return']:
watch, new_vars, _ = self.__action_context.process_capture_variable(event, arg)
self.__snapshot.add_watch_result(watch)
self.__snapshot.merge_var_lookup(new_vars)

ctx.push_service.push_snapshot(self.__snapshot)
return False


class SendSnapshotActionResult(DeferredSnapshotActionResult):
"""The result of a successful snapshot action."""

def __init__(self, action_context: ActionContext, snapshot: EventSnapshot):
"""
Create a new snapshot action result.

:param action_context: the action context that created this result
:param snapshot: the snapshot result
"""
self.action_context = action_context
self.snapshot = snapshot

def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]:
"""
Process this result.

:param ctx: the triggering context

:return: an action callback if we need to do something at the 'end', or None
"""
snapshot = self._decorate_snapshot(ctx)
ctx.push_service.push_snapshot(snapshot)
return None
3 changes: 2 additions & 1 deletion src/deep/processor/context/span_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def __init__(self, spans):
"""Create callback."""
self.__spans = spans

def process(self, event: str, frame: FrameType, arg: any) -> bool:
def process(self, ctx: 'TriggerContext', event: str, frame: FrameType, arg: any) -> bool:
"""
Process a callback.

:param ctx: the context for this trigger
:param event: the event
:param frame: the frame data
:param arg: the arg from settrace
Expand Down
14 changes: 13 additions & 1 deletion src/deep/processor/context/trigger_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ class TriggerContext:
collect the data and ship of the results.
"""

def __init__(self, config: ConfigService, push_service: PushService, frame: FrameType, event: str):
def __init__(self, config: ConfigService, push_service: PushService, frame: FrameType, event: str, arg: any):
"""
Create a new trigger context.

:param config: the config service
:param push_service: the push service
:param frame: the frame data
:param event: the trigger event
:param arg: the trigger arg
"""
self.__push_service = push_service
self.__event = event
self.__frame = frame
self.__arg = arg
self.__config = config
self.__results: List[ActionResult] = []
self.__ts: int = time_ns()
Expand Down Expand Up @@ -114,6 +116,16 @@ def config(self) -> ConfigService:
"""The config service."""
return self.__config

@property
def arg(self):
"""The trigger arg value."""
return self.__arg

@property
def event(self):
"""The trigger event value."""
return self.__event

def action_context(self, action: 'LocationAction') -> 'ActionContext':
"""
Create an action context from this context, for the provided action.
Expand Down
10 changes: 6 additions & 4 deletions src/deep/processor/trigger_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def trace_call(self, frame: FrameType, event: str, arg):
:return: None to ignore other calls, or our self to continue
"""
event, file, line, function = self.location_from_event(event, frame)
trigger_context = TriggerContext(self._config, self._push_service, frame, event, arg)

if event in ["line", "return", "exception"] and self._callbacks.is_set:
self.__process_call_backs(arg, frame, event, file, line, function)
self.__process_call_backs(trigger_context, arg, frame, event, file, line, function)

# return if we do not have any tracepoints
if len(self._tp_config) == 0:
Expand All @@ -147,7 +149,6 @@ def trace_call(self, frame: FrameType, event: str, arg):
if len(actions) == 0:
return self.trace_call

trigger_context = TriggerContext(self._config, self._push_service, frame, event)
try:
with trigger_context:
for action in actions:
Expand Down Expand Up @@ -176,13 +177,14 @@ def __actions_for_location(self, event, file, line, function, frame):
actions += trigger.actions
return actions

def __process_call_backs(self, arg: any, frame: FrameType, event: str, file: str, line: int, function_name: str):
def __process_call_backs(self, ctx: 'TriggerContext', arg: any, frame: FrameType, event: str, file: str, line: int,
function_name: str):
# remove top context
context: CallbackContext = self._callbacks.value.pop()
# if it is for our location process it
if context.at_location(event, file, line, function_name, frame):
logging.debug("At callback location %s", context.name)
context.process(event, frame, arg)
context.process(ctx, event, frame, arg)
else:
logging.debug("Not at callback location %s", context.name)
# else put the context back on the queue
Expand Down
Loading