Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ rel-agent: check-version check-python-vars

.PHONY: rel-docker-test-app
rel-docker-test-app:
docker build -t ghcr.io/intergral/deep-python-client:simple-app $(ROOT_DIR)/examples/simple-app-docker
docker build -t intergral/deep-python:latest $(ROOT_DIR)/examples/simple-app-docker

docker push ghcr.io/intergral/deep-python-client:simple-app
docker push intergral/deep-python:latest

.PHONY: docs
docs:
Expand Down
1 change: 1 addition & 0 deletions deep-python-client.iml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<sourceFolder url="file://$MODULE_DIR$/examples/simple-app/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/test" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/venv/lib/python3.10/site-packages/deep" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (deep-python-client)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
Expand Down
4 changes: 4 additions & 0 deletions src/deep/api/deep.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from deep.config import ConfigService
from deep.config.tracepoint_config import TracepointConfigService
from deep.grpc import GRPCService
from deep.logging.tracepoint_logger import TracepointLogger
from deep.poll import LongPoll
from deep.processor import TriggerHandler
from deep.push import PushService
Expand Down Expand Up @@ -78,3 +79,6 @@ def get(self) -> TracePointConfig:

def unregister(self):
self._tpServ.remove_custom(self._cfg)

def tracepoint_logger(self, logger: 'TracepointLogger'):
self.config.tracepoint_logger = logger
30 changes: 21 additions & 9 deletions src/deep/api/tracepoint/eventsnapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# GNU Affero General Public License for more details.

import random
from typing import List, Dict
from typing import List, Dict, Optional

from deep.api.attributes import BoundedAttributes
from deep.api.resource import Resource
Expand All @@ -34,6 +34,7 @@ def __init__(self, tracepoint, ts, resource, frames, var_lookup: Dict[str, 'Vari
self._duration_nanos = 0
self._resource = Resource.get_empty().merge(resource)
self._open = True
self._log = None

def complete(self):
if not self._open:
Expand All @@ -44,10 +45,13 @@ def complete(self):
def is_open(self):
return self._open

def add_watch_result(self, watch_result, watch_lookup):
def add_watch_result(self, watch_result: 'WatchResult'):
if self.is_open():
self.watches.append(watch_result)
self._var_lookup.update(watch_lookup)

def merge_var_lookup(self, lookup: Dict[str, 'Variable']):
if self.is_open():
self._var_lookup.update(lookup)

@property
def id(self):
Expand Down Expand Up @@ -85,6 +89,14 @@ def duration_nanos(self):
def resource(self):
return self._resource

@property
def log_msg(self):
return self._log

@log_msg.setter
def log_msg(self, msg):
self._log = msg

def __str__(self) -> str:
return str(self.__dict__)

Expand Down Expand Up @@ -278,22 +290,22 @@ class WatchResult:
"""

def __init__(self,
expression,
result,
error=None
expression: str,
result: Optional['VariableId'],
error: Optional[str] = None
):
self._expression = expression
self._result = result
self._error = error

@property
def expression(self):
def expression(self) -> str:
return self._expression

@property
def result(self):
def result(self) -> Optional['VariableId']:
return self._result

@property
def error(self):
def error(self) -> Optional[str]:
return self._error
3 changes: 3 additions & 0 deletions src/deep/api/tracepoint/tracepoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
NO_STACK = 'no_stack'
"""Do not collect the stack data"""

LOG_MSG = 'log_msg'
"""The log message to interpolate at position of tracepoint"""


def frame_type_ordinal(frame_type) -> int:
"""
Expand Down
15 changes: 14 additions & 1 deletion src/deep/config/config_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from deep.api.plugin import Plugin
from deep.api.resource import Resource
from deep.config.tracepoint_config import TracepointConfigService
from deep.logging.tracepoint_logger import DefaultLogger, TracepointLogger


class ConfigService:
Expand All @@ -33,6 +34,7 @@ def __init__(self, custom: Dict[str, any]):
self.__custom = custom
self._resource = None
self._tracepoint_config = TracepointConfigService()
self._tracepoint_logger: 'TracepointLogger' = DefaultLogger(self)

def __getattribute__(self, name: str) -> Any:
"""
Expand Down Expand Up @@ -95,8 +97,19 @@ def plugins(self, plugins):
self._plugins = plugins

@property
def tracepoints(self) -> TracepointConfigService:
def tracepoints(self) -> 'TracepointConfigService':
return self._tracepoint_config

def add_listener(self, listener):
self._tracepoint_config.add_listener(listener)

@property
def tracepoint_logger(self) -> 'TracepointLogger':
return self._tracepoint_logger

@tracepoint_logger.setter
def tracepoint_logger(self, logger: 'TracepointLogger'):
self._tracepoint_logger = logger

def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str):
self._tracepoint_logger.log_tracepoint(log_msg, tp_id, snap_id)
36 changes: 36 additions & 0 deletions src/deep/logging/tracepoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (C) 2023 Intergral GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import abc

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from deep.config import ConfigService

from deep import logging


class TracepointLogger(abc.ABC):

@abc.abstractmethod
def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str):
pass


class DefaultLogger(TracepointLogger):
def __init__(self, _config: 'ConfigService'):
self._config = _config

def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str):
logging.info(log_msg + " snapshot=%s tracepoint=%s" % (snap_id, tp_id))
65 changes: 55 additions & 10 deletions src/deep/processor/frame_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
# GNU Affero General Public License for more details.

import abc
from typing import Dict, Tuple, Optional
from typing import Dict, Tuple, List, Optional

from deep import logging
from deep.api.tracepoint import StackFrame, WatchResult, Variable, VariableId
from deep.processor.bfs import Node, NodeValue, breadth_first_search, ParentNode
from deep.utils import time_ns
from .frame_config import FrameProcessorConfig
from .variable_processor import process_variable, process_child_nodes, variable_to_string, truncate_string, Collector
from ..config import ConfigService


class FrameCollector(Collector):
"""
This deals with collecting data from the paused frames.
"""

def __init__(self, frame, config):
def __init__(self, frame, config: ConfigService):
self._var_cache: Dict[str, str] = {}
self._config = config
self._has_time_exceeded = False
Expand All @@ -47,17 +48,56 @@ def configure_self(self):
def add_child_to_lookup(self, parent_id: str, child: VariableId):
self._var_lookup[parent_id].children.append(child)

def eval_watch(self, watch):
def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str):
self._config.log_tracepoint(log_msg, tp_id, snap_id)

def process_log(self, tp, log_msg) -> Tuple[str, List[WatchResult], Dict[str, Variable]]:
frame_col = self
watch_results = []
_var_lookup = {}

class FormatDict(dict):
"""This type is used in the log process to ensure that missing values are formatted don't error"""

def __missing__(self, key):
return "{%s}" % key

import string

class FormatExtractor(string.Formatter):
"""This type allows us to use watches within log strings and collect the watch
as well as interpolate the values"""

def get_field(self, field_name, args, kwargs):
# evaluate watch
watch, var_lookup, log_str = frame_col.eval_watch(field_name)
# collect data
watch_results.append(watch)
_var_lookup.update(var_lookup)

return log_str, field_name

log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self._frame.f_locals))
return log_msg, watch_results, _var_lookup

def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]:
"""
Evaluate an expression in the current frame.
:param watch: The watch expression to evaluate.
:return: Tuple with WatchResult, collected variables, and the log string for the expression
"""
# reset var lookup - var cache is still used to reduce duplicates
self._var_lookup = {}

try:
result = eval(watch, None, self._frame.f_locals)
watch_var, var_lookup = self.process_watch_result_breadth_first(watch, result)
return WatchResult(watch, watch_var), var_lookup
watch_var, var_lookup, log_str = self.process_watch_result_breadth_first(watch, result)
# again we reset the local version of the var lookup.
self._var_lookup = {}
return WatchResult(watch, watch_var), var_lookup, log_str
except BaseException as e:
logging.exception("Error evaluating watch %s", watch)
return WatchResult(watch, None, str(e)), {}
return WatchResult(watch, None, str(e)), {}, str(e)

def process_frame(self):
"""
Expand All @@ -72,7 +112,11 @@ def process_frame(self):
frame = self._process_frame(current_frame, self._frame_config.should_collect_vars(len(collected_frames)))
collected_frames.append(frame)
current_frame = current_frame.f_back
return collected_frames, self._var_lookup
# We want to clear the local collected var lookup now that we have processed the frame
# this is, so we can process watches later while maintaining independence between tracepoints
_vars = self._var_lookup
self._var_lookup = {}
return collected_frames, _vars

def _process_frame(self, frame, process_vars):
# process the current frame info
Expand Down Expand Up @@ -172,13 +216,14 @@ def check_var_count(self):
return False
return True

def process_watch_result_breadth_first(self, watch: str, result: any):
def process_watch_result_breadth_first(self, watch: str, result: any) -> (
Tuple)[VariableId, Dict[str, Variable], str]:

identity_hash_id = str(id(result))
check_id = self.check_id(identity_hash_id)
if check_id is not None:
# this means the watch result is already in the var_lookup
return VariableId(check_id, watch), {}
return VariableId(check_id, watch), {}, str(result)

# else this is an unknown value so process breadth first
var_ids = []
Expand All @@ -201,7 +246,7 @@ def add_child(self, child):

self._var_lookup[var_id] = Variable(str(variable_type.__name__), variable_value_str, identity_hash_id, [],
truncated)
return VariableId(var_id, watch), self._var_lookup
return VariableId(var_id, watch), self._var_lookup, str(result)

def check_id(self, identity_hash_id):
if identity_hash_id in self._var_cache:
Expand Down
16 changes: 14 additions & 2 deletions src/deep/processor/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from deep import logging
from deep.api.attributes import BoundedAttributes
from deep.api.tracepoint import TracePointConfig, EventSnapshot
from deep.api.tracepoint.tracepoint_config import LOG_MSG
from deep.config import ConfigService
from deep.processor.frame_collector import FrameCollector

Expand Down Expand Up @@ -43,8 +44,19 @@ def collect(self):
snapshot = EventSnapshot(tp, self._ts, self._config.resource, stack, variables)
# process the snapshot watches
for watch in tp.watches:
result, watch_lookup = self.eval_watch(watch)
snapshot.add_watch_result(result, watch_lookup)
result, watch_lookup, _ = self.eval_watch(watch)
snapshot.add_watch_result(result)
snapshot.merge_var_lookup(watch_lookup)

log_msg = tp.get_arg(LOG_MSG, None)
if log_msg is not None:
processed_log, watch_results, watch_lookup = self.process_log(tp, log_msg)
snapshot.log_msg = processed_log
for watch in watch_results:
snapshot.add_watch_result(watch)
snapshot.merge_var_lookup(watch_lookup)
self.log_tracepoint(processed_log, tp.id, format(snapshot.id, "016x"))

# process the snapshot attributes
attributes = self.process_attributes(tp)
snapshot.attributes.merge_in(attributes)
Expand Down
4 changes: 3 additions & 1 deletion src/deep/processor/trigger_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import sys
import threading

from deep.config import ConfigService
from deep.config.tracepoint_config import ConfigUpdateListener
from deep.processor.frame_processor import FrameProcessor
from deep.push import PushService


def add_or_get(target, key, default_value):
Expand Down Expand Up @@ -51,7 +53,7 @@ class TriggerHandler:
should collect data.
"""

def __init__(self, config, push_service):
def __init__(self, config: ConfigService, push_service: PushService):
self._push_service = push_service
self._tp_config = []
self._config = config
Expand Down
3 changes: 2 additions & 1 deletion src/deep/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def convert_snapshot(snapshot: EventSnapshot) -> Snapshot:
attributes=[KeyValue(key=k, value=convert_value(v)) for k, v in snapshot.attributes.items()],
duration_nanos=snapshot.duration_nanos,
resource=[KeyValue(key=k, value=convert_value(v)) for k, v in
snapshot.resource.attributes.items()])
snapshot.resource.attributes.items()],
log_msg=snapshot.log_msg)
except Exception:
logging.exception("Error converting to protobuf")
return Snapshot()
6 changes: 6 additions & 0 deletions test/test_deep/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

class MockFrame:
def __init__(self, _locals=None):
if _locals is None:
_locals = {}
self.f_locals = _locals
Loading