diff --git a/pyproject.toml b/pyproject.toml index b8b27840..fa5a0300 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ openai-agents = [ "temporalio[openai-agents] >= 1.15.0", ] pydantic-converter = ["pydantic>=2.10.6,<3"] -sentry = ["sentry-sdk>=1.11.0,<2"] +sentry = ["sentry-sdk>=2.13.0"] trio-async = [ "trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16", @@ -143,5 +143,4 @@ ignore_errors = true [[tool.mypy.overrides]] module = "opentelemetry.*" -ignore_errors = true - +ignore_errors = true \ No newline at end of file diff --git a/sentry/README.md b/sentry/README.md index 33a7b535..1cc75cf6 100644 --- a/sentry/README.md +++ b/sentry/README.md @@ -1,19 +1,40 @@ # Sentry Sample -This sample shows how to configure [Sentry](https://sentry.io) to intercept and capture errors from the Temporal SDK. +This sample shows how to configure [Sentry](https://sentry.io) SDK (version 2) to intercept and capture errors from the Temporal SDK +for workflows and activities. The integration adds some useful context to the errors, such as the activity type, task queue, etc. + +## Further details + +This is a small modification of the original example Sentry integration in this repo based on SDK v1. The integration +didn't work properly with Sentry SDK v2 due to some internal changes in the Sentry SDK that broke the worker sandbox. +Additionally, the v1 SDK has been deprecated and is only receiving security patches and will reach EOL some time in the future. +If you still need to use Sentry SDK v1, check the original example at this [commit](https://github.com/temporalio/samples-python/blob/090b96d750bafc10d4aad5ad506bb2439c413d5e/sentry). + +## Running the Sample For this sample, the optional `sentry` dependency group must be included. To include, run: - uv sync --group sentry + uv sync --no-default-groups --dev --group sentry + +> Note: this integration breaks when `gevent` is installed (e.g. by the gevent sample) so make sure to only install +> the `sentry` group and run the scripts below as described. To run, first see [README.md](../README.md) for prerequisites. Set `SENTRY_DSN` environment variable to the Sentry DSN. Then, run the following from the root directory to start the worker: + export SENTRY_DSN= # You'll need a Sentry account to test against + export ENVIRONMENT=dev uv run sentry/worker.py This will start the worker. Then, in another terminal, run the following to execute the workflow: uv run sentry/starter.py -The workflow should complete with the hello result. If you alter the workflow or the activity to raise an -`ApplicationError` instead, it should appear in Sentry. \ No newline at end of file +You should see the activity fail causing an error to be reported to Sentry. + +## Screenshot + +The screenshot below shows the extra tags and context included in the +Sentry error from the exception thrown in the activity. + +![Sentry screenshot](images/sentry.jpeg) diff --git a/sentry/activity.py b/sentry/activity.py new file mode 100644 index 00000000..148cd0d2 --- /dev/null +++ b/sentry/activity.py @@ -0,0 +1,25 @@ +from dataclasses import dataclass + +from temporalio import activity + + +@dataclass +class WorkingActivityInput: + message: str + + +@activity.defn +async def working_activity(input: WorkingActivityInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + return "Success" + + +@dataclass +class BrokenActivityInput: + message: str + + +@activity.defn +async def broken_activity(input: BrokenActivityInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + raise Exception("Activity failed!") diff --git a/sentry/images/sentry.jpeg b/sentry/images/sentry.jpeg new file mode 100644 index 00000000..0f62825b Binary files /dev/null and b/sentry/images/sentry.jpeg differ diff --git a/sentry/interceptor.py b/sentry/interceptor.py index eceba41d..d016acef 100644 --- a/sentry/interceptor.py +++ b/sentry/interceptor.py @@ -1,5 +1,6 @@ +import logging from dataclasses import asdict, is_dataclass -from typing import Any, Optional, Type, Union +from typing import Any, Optional, Type from temporalio import activity, workflow from temporalio.worker import ( @@ -12,63 +13,65 @@ ) with workflow.unsafe.imports_passed_through(): - from sentry_sdk import Hub, capture_exception, set_context, set_tag + import sentry_sdk -def _set_common_workflow_tags(info: Union[workflow.Info, activity.Info]): - set_tag("temporal.workflow.type", info.workflow_type) - set_tag("temporal.workflow.id", info.workflow_id) +logger = logging.getLogger(__name__) class _SentryActivityInboundInterceptor(ActivityInboundInterceptor): async def execute_activity(self, input: ExecuteActivityInput) -> Any: # https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues - with Hub(Hub.current): - set_tag("temporal.execution_type", "activity") - set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__) - + with sentry_sdk.isolation_scope() as scope: + scope.set_tag("temporal.execution_type", "activity") + scope.set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__) activity_info = activity.info() - _set_common_workflow_tags(activity_info) - set_tag("temporal.activity.id", activity_info.activity_id) - set_tag("temporal.activity.type", activity_info.activity_type) - set_tag("temporal.activity.task_queue", activity_info.task_queue) - set_tag("temporal.workflow.namespace", activity_info.workflow_namespace) - set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) + scope.set_tag("temporal.workflow.type", activity_info.workflow_type) + scope.set_tag("temporal.workflow.id", activity_info.workflow_id) + scope.set_tag("temporal.activity.id", activity_info.activity_id) + scope.set_tag("temporal.activity.type", activity_info.activity_type) + scope.set_tag("temporal.activity.task_queue", activity_info.task_queue) + scope.set_tag( + "temporal.workflow.namespace", activity_info.workflow_namespace + ) + scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) try: return await super().execute_activity(input) except Exception as e: if len(input.args) == 1: [arg] = input.args if is_dataclass(arg) and not isinstance(arg, type): - set_context("temporal.activity.input", asdict(arg)) - set_context("temporal.activity.info", activity.info().__dict__) - capture_exception() + scope.set_context("temporal.activity.input", asdict(arg)) + scope.set_context("temporal.activity.info", activity.info().__dict__) + scope.capture_exception() raise e class _SentryWorkflowInterceptor(WorkflowInboundInterceptor): async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: # https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues - with Hub(Hub.current): - set_tag("temporal.execution_type", "workflow") - set_tag("module", input.run_fn.__module__ + "." + input.run_fn.__qualname__) + with sentry_sdk.isolation_scope() as scope: + scope.set_tag("temporal.execution_type", "workflow") + scope.set_tag( + "module", input.run_fn.__module__ + "." + input.run_fn.__qualname__ + ) workflow_info = workflow.info() - _set_common_workflow_tags(workflow_info) - set_tag("temporal.workflow.task_queue", workflow_info.task_queue) - set_tag("temporal.workflow.namespace", workflow_info.namespace) - set_tag("temporal.workflow.run_id", workflow_info.run_id) + scope.set_tag("temporal.workflow.type", workflow_info.workflow_type) + scope.set_tag("temporal.workflow.id", workflow_info.workflow_id) + scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue) + scope.set_tag("temporal.workflow.namespace", workflow_info.namespace) + scope.set_tag("temporal.workflow.run_id", workflow_info.run_id) try: return await super().execute_workflow(input) except Exception as e: if len(input.args) == 1: [arg] = input.args if is_dataclass(arg) and not isinstance(arg, type): - set_context("temporal.workflow.input", asdict(arg)) - set_context("temporal.workflow.info", workflow.info().__dict__) - + scope.set_context("temporal.workflow.input", asdict(arg)) + scope.set_context("temporal.workflow.info", workflow.info().__dict__) if not workflow.unsafe.is_replaying(): with workflow.unsafe.sandbox_unrestricted(): - capture_exception() + scope.capture_exception() raise e @@ -78,9 +81,6 @@ class SentryInterceptor(Interceptor): def intercept_activity( self, next: ActivityInboundInterceptor ) -> ActivityInboundInterceptor: - """Implementation of - :py:meth:`temporalio.worker.Interceptor.intercept_activity`. - """ return _SentryActivityInboundInterceptor(super().intercept_activity(next)) def workflow_interceptor_class( diff --git a/sentry/starter.py b/sentry/starter.py index 9d0a0dc7..372a732c 100644 --- a/sentry/starter.py +++ b/sentry/starter.py @@ -1,9 +1,8 @@ import asyncio -import os from temporalio.client import Client -from sentry.worker import GreetingWorkflow +from sentry.workflow import SentryExampleWorkflow, SentryExampleWorkflowInput async def main(): @@ -11,13 +10,16 @@ async def main(): client = await Client.connect("localhost:7233") # Run workflow - result = await client.execute_workflow( - GreetingWorkflow.run, - "World", - id="sentry-workflow-id", - task_queue="sentry-task-queue", - ) - print(f"Workflow result: {result}") + try: + result = await client.execute_workflow( + SentryExampleWorkflow.run, + SentryExampleWorkflowInput(option="broken"), + id="sentry-workflow-id", + task_queue="sentry-task-queue", + ) + print(f"Workflow result: {result}") + except Exception: + print("Workflow failed - check Sentry for details") if __name__ == "__main__": diff --git a/sentry/worker.py b/sentry/worker.py index 1db0826b..723b8e52 100644 --- a/sentry/worker.py +++ b/sentry/worker.py @@ -1,64 +1,92 @@ import asyncio -import logging import os -from dataclasses import dataclass -from datetime import timedelta import sentry_sdk -from temporalio import activity, workflow +from sentry_sdk.integrations.asyncio import AsyncioIntegration +from sentry_sdk.types import Event, Hint from temporalio.client import Client from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) +from sentry.activity import broken_activity, working_activity from sentry.interceptor import SentryInterceptor +from sentry.workflow import SentryExampleWorkflow +interrupt_event = asyncio.Event() -@dataclass -class ComposeGreetingInput: - greeting: str - name: str +def before_send(event: Event, hint: Hint) -> Event | None: + # Filter out __ShutdownRequested events raised by the worker's internals + if str(hint.get("exc_info", [None])[0].__name__) == "_ShutdownRequested": + return None -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - activity.logger.info("Running activity with parameter %s" % input) - return f"{input.greeting}, {input.name}!" + return event -@workflow.defn -class GreetingWorkflow: - @workflow.run - async def run(self, name: str) -> str: - workflow.logger.info("Running workflow with parameter %s" % name) - return await workflow.execute_activity( - compose_greeting, - ComposeGreetingInput("Hello", name), - start_to_close_timeout=timedelta(seconds=10), +def initialise_sentry() -> None: + sentry_dsn = os.environ.get("SENTRY_DSN") + if not sentry_dsn: + print( + "SENTRY_DSN environment variable is not set. Sentry will not be initialized." ) + return + environment = os.environ.get("ENVIRONMENT") + sentry_sdk.init( + dsn=sentry_dsn, + environment=environment, + integrations=[ + AsyncioIntegration(), + ], + attach_stacktrace=True, + before_send=before_send, + ) + print(f"Sentry SDK initialized for environment: {environment!r}") -async def main(): - # Uncomment the line below to see logging - # logging.basicConfig(level=logging.INFO) +async def main(): # Initialize the Sentry SDK - sentry_sdk.init( - dsn=os.environ.get("SENTRY_DSN"), - ) + initialise_sentry() # Start client client = await Client.connect("localhost:7233") # Run a worker for the workflow - worker = Worker( + async with Worker( client, task_queue="sentry-task-queue", - workflows=[GreetingWorkflow], - activities=[compose_greeting], + workflows=[SentryExampleWorkflow], + activities=[broken_activity, working_activity], interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting - ) - - await worker.run() + workflow_runner=SandboxedWorkflowRunner( + restrictions=SandboxRestrictions.default.with_passthrough_modules( + "sentry_sdk" + ) + ), + ): + # Wait until interrupted + print("Worker started, ctrl+c to exit") + await interrupt_event.wait() + print("Shutting down") if __name__ == "__main__": - asyncio.run(main()) + # Note: "Addressing Concurrency Issues" section in Sentry docs recommends using + # the AsyncioIntegration: "If you do concurrency with asyncio coroutines, make + # sure to use the AsyncioIntegration which will clone the correct scope in your Tasks" + # See https://docs.sentry.io/platforms/python/troubleshooting/ + # + # However, this captures all unhandled exceptions in the event loop. + # So handle shutdown gracefully to avoid CancelledError and KeyboardInterrupt + # exceptions being captured as errors. Sentry also captures the worker's + # _ShutdownRequested exception, which is probably not useful. We've filtered this + # out in Sentry's before_send function. + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/sentry/workflow.py b/sentry/workflow.py new file mode 100644 index 00000000..4cb779ea --- /dev/null +++ b/sentry/workflow.py @@ -0,0 +1,38 @@ +import typing +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +from sentry.activity import WorkingActivityInput, working_activity + +with workflow.unsafe.imports_passed_through(): + from sentry.activity import BrokenActivityInput, broken_activity + + +@dataclass +class SentryExampleWorkflowInput: + option: typing.Literal["working", "broken"] + + +@workflow.defn +class SentryExampleWorkflow: + @workflow.run + async def run(self, input: SentryExampleWorkflowInput) -> str: + workflow.logger.info("Running workflow with parameter %r" % input) + + if input.option == "working": + return await workflow.execute_activity( + working_activity, + WorkingActivityInput(message="Hello, Temporal!"), + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + return await workflow.execute_activity( + broken_activity, + BrokenActivityInput(message="Hello, Temporal!"), + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(maximum_attempts=1), + ) diff --git a/tests/sentry/fake_sentry_transport.py b/tests/sentry/fake_sentry_transport.py new file mode 100644 index 00000000..7d1b087d --- /dev/null +++ b/tests/sentry/fake_sentry_transport.py @@ -0,0 +1,17 @@ +import sentry_sdk +import sentry_sdk.types + + +class FakeSentryTransport: + """A fake transport that captures Sentry events in memory""" + + # Note: we could extend from sentry_sdk.transport.Transport + # but `sentry_sdk.init` also takes a simple callable that takes + # an Event rather than a serialised Envelope object, so testing + # is easier. + + def __init__(self): + self.events: list[sentry_sdk.types.Event] = [] + + def __call__(self, event: sentry_sdk.types.Event) -> None: + self.events.append(event) diff --git a/tests/sentry/test_interceptor.py b/tests/sentry/test_interceptor.py new file mode 100644 index 00000000..731bad8c --- /dev/null +++ b/tests/sentry/test_interceptor.py @@ -0,0 +1,156 @@ +import unittest.mock +from collections import abc + +import pytest +import sentry_sdk +import temporalio.activity +import temporalio.workflow +from sentry_sdk.integrations.asyncio import AsyncioIntegration +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) + +from sentry.activity import broken_activity, working_activity +from sentry.interceptor import SentryInterceptor +from sentry.workflow import SentryExampleWorkflow, SentryExampleWorkflowInput +from tests.sentry.fake_sentry_transport import FakeSentryTransport + + +@pytest.fixture +def transport() -> FakeSentryTransport: + """Fixture to provide a fake transport for Sentry SDK.""" + return FakeSentryTransport() + + +@pytest.fixture(autouse=True) +def sentry_init(transport: FakeSentryTransport) -> None: + """Initialize Sentry for testing.""" + sentry_sdk.init( + transport=transport, + integrations=[ + AsyncioIntegration(), + ], + ) + + +@pytest.fixture +async def worker(client: Client) -> abc.AsyncIterator[Worker]: + """Fixture to provide a worker for testing.""" + async with Worker( + client, + task_queue="sentry-task-queue", + workflows=[SentryExampleWorkflow], + activities=[broken_activity, working_activity], + interceptors=[SentryInterceptor()], + workflow_runner=SandboxedWorkflowRunner( + restrictions=SandboxRestrictions.default.with_passthrough_modules( + "sentry_sdk" + ) + ), + ) as worker: + yield worker + + +async def test_sentry_interceptor_reports_no_errors_when_workflow_succeeds( + client: Client, worker: Worker, transport: FakeSentryTransport +) -> None: + """Test that Sentry interceptor reports no errors when workflow succeeds.""" + # WHEN + try: + await client.execute_workflow( + SentryExampleWorkflow.run, + SentryExampleWorkflowInput(option="working"), + id="sentry-workflow-id", + task_queue=worker.task_queue, + ) + except Exception: + pytest.fail("Workflow should not raise an exception") + + # THEN + assert len(transport.events) == 0, "No events should be captured" + + +async def test_sentry_interceptor_captures_errors( + client: Client, worker: Worker, transport: FakeSentryTransport +) -> None: + """Test that errors are captured with correct Sentry metadata.""" + # WHEN + try: + await client.execute_workflow( + SentryExampleWorkflow.run, + SentryExampleWorkflowInput(option="broken"), + id="sentry-workflow-id", + task_queue=worker.task_queue, + ) + pytest.fail("Workflow should raise an exception") + except Exception: + pass + + # THEN + # there should be two events: one for the failed activity and one for the failed workflow + assert len(transport.events) == 2, "Two events should be captured" + + # Check the first event - should be the activity exception + # -------------------------------------------------------- + event = transport.events[0] + + # Check exception was captured + assert event["exception"]["values"][0]["type"] == "Exception" + assert event["exception"]["values"][0]["value"] == "Activity failed!" + + # Check useful metadata were captured as tags + assert event["tags"] == { + "temporal.execution_type": "activity", + "module": "sentry.activity.broken_activity", + "temporal.workflow.type": "SentryExampleWorkflow", + "temporal.workflow.id": "sentry-workflow-id", + "temporal.activity.id": "1", + "temporal.activity.type": "broken_activity", + "temporal.activity.task_queue": "sentry-task-queue", + "temporal.workflow.namespace": "default", + "temporal.workflow.run_id": unittest.mock.ANY, + } + + # Check activity input was captured as context + assert event["contexts"]["temporal.activity.input"] == { + "message": "Hello, Temporal!", + } + + # Check activity info was captured as context + activity_info = temporalio.activity.Info( + **event["contexts"]["temporal.activity.info"] # type: ignore + ) + assert activity_info.activity_type == "broken_activity" + + # Check the second event - should be the workflow exception + # --------------------------------------------------------- + event = transport.events[1] + + # Check exception was captured + assert event["exception"]["values"][0]["type"] == "ApplicationError" + assert event["exception"]["values"][0]["value"] == "Activity failed!" + + # Check useful metadata were captured as tags + assert event["tags"] == { + "temporal.execution_type": "workflow", + "module": "sentry.workflow.SentryExampleWorkflow.run", + "temporal.workflow.type": "SentryExampleWorkflow", + "temporal.workflow.id": "sentry-workflow-id", + "temporal.workflow.task_queue": "sentry-task-queue", + "temporal.workflow.namespace": "default", + "temporal.workflow.run_id": unittest.mock.ANY, + } + + # Check workflow input was captured as context + assert event["contexts"]["temporal.workflow.input"] == { + "option": "broken", + } + + # Check workflow info was captured as context + workflow_info = temporalio.workflow.Info( + **event["contexts"]["temporal.workflow.info"] # type: ignore + ) + assert workflow_info.workflow_type == "SentryExampleWorkflow" diff --git a/uv.lock b/uv.lock index 8fc1c4b4..9b1309d1 100644 --- a/uv.lock +++ b/uv.lock @@ -2477,15 +2477,15 @@ wheels = [ [[package]] name = "sentry-sdk" -version = "1.45.1" +version = "2.34.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "certifi" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c8/28/02c0cd9184f9108e3c52519f9628b215077a3854240e0b17ae845e664855/sentry_sdk-1.45.1.tar.gz", hash = "sha256:a16c997c0f4e3df63c0fc5e4207ccb1ab37900433e0f72fef88315d317829a26", size = 244774, upload-time = "2024-07-26T13:48:32.375Z" } +sdist = { url = "https://files.pythonhosted.org/packages/3a/38/10d6bfe23df1bfc65ac2262ed10b45823f47f810b0057d3feeea1ca5c7ed/sentry_sdk-2.34.1.tar.gz", hash = "sha256:69274eb8c5c38562a544c3e9f68b5be0a43be4b697f5fd385bf98e4fbe672687", size = 336969, upload-time = "2025-07-30T11:13:37.93Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fe/9f/105366a122efa93f0cb1914f841747d160788e4d022d0488d2d44c2ba26c/sentry_sdk-1.45.1-py2.py3-none-any.whl", hash = "sha256:608887855ccfe39032bfd03936e3a1c4f4fc99b3a4ac49ced54a4220de61c9c1", size = 267163, upload-time = "2024-07-26T13:48:29.38Z" }, + { url = "https://files.pythonhosted.org/packages/2d/3e/bb34de65a5787f76848a533afbb6610e01fbcdd59e76d8679c254e02255c/sentry_sdk-2.34.1-py2.py3-none-any.whl", hash = "sha256:b7a072e1cdc5abc48101d5146e1ae680fa81fe886d8d95aaa25a0b450c818d32", size = 357743, upload-time = "2025-07-30T11:13:36.145Z" }, ] [[package]] @@ -2748,7 +2748,7 @@ openai-agents = [ { name = "temporalio", extras = ["openai-agents"], specifier = ">=1.15.0" }, ] pydantic-converter = [{ name = "pydantic", specifier = ">=2.10.6,<3" }] -sentry = [{ name = "sentry-sdk", specifier = ">=1.11.0,<2" }] +sentry = [{ name = "sentry-sdk", specifier = ">=2.13.0" }] trio-async = [ { name = "trio", specifier = ">=0.28.0,<0.29" }, { name = "trio-asyncio", specifier = ">=0.15.0,<0.16" },