From b768b5c307eb2116ddedaf386c16a8e4c22e8c13 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Fri, 27 Jan 2023 20:23:11 +0300 Subject: [PATCH 01/14] Initial arq setup --- .github/workflows/test-integration-arq.yml | 73 ++++++++++++++++++++++ sentry_sdk/integrations/arq.py | 27 ++++++++ setup.py | 1 + tests/integrations/arq/__init__.py | 3 + tests/integrations/arq/test_arq.py | 40 ++++++++++++ tox.ini | 16 +++-- 6 files changed, 156 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/test-integration-arq.yml create mode 100644 sentry_sdk/integrations/arq.py create mode 100644 tests/integrations/arq/__init__.py create mode 100644 tests/integrations/arq/test_arq.py diff --git a/.github/workflows/test-integration-arq.yml b/.github/workflows/test-integration-arq.yml new file mode 100644 index 0000000000..2eee836bc1 --- /dev/null +++ b/.github/workflows/test-integration-arq.yml @@ -0,0 +1,73 @@ +name: Test arq + +on: + push: + branches: + - master + - release/** + + pull_request: + +# Cancel in progress workflows on pull_requests. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +permissions: + contents: read + +env: + BUILD_CACHE_KEY: ${{ github.sha }} + CACHED_BUILD_PATHS: | + ${{ github.workspace }}/dist-serverless + +jobs: + test: + name: arq, python ${{ matrix.python-version }}, ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 45 + + strategy: + fail-fast: false + matrix: + python-version: ["3.7","3.8","3.9","3.10","3.11"] + # python3.6 reached EOL and is no longer being supported on + # new versions of hosted runners on Github Actions + # ubuntu-20.04 is the last version that supported python3.6 + # see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877 + os: [ubuntu-20.04] + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup Test Env + run: | + pip install codecov "tox>=3,<4" + + - name: Test arq + timeout-minutes: 45 + shell: bash + run: | + set -x # print commands that are executed + coverage erase + + ./scripts/runtox.sh "${{ matrix.python-version }}-arq" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch + coverage combine .coverage* + coverage xml -i + codecov --file coverage.xml + + check_required_tests: + name: All arq tests passed or skipped + needs: test + # Always run this, even if a dependent job failed + if: always() + runs-on: ubuntu-20.04 + steps: + - name: Check for failures + if: contains(needs.test.result, 'failure') + run: | + echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1 diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py new file mode 100644 index 0000000000..351d5145fb --- /dev/null +++ b/sentry_sdk/integrations/arq.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import + +from sentry_sdk.integrations import DidNotEnable, Integration + +try: + from arq.version import VERSION as ARQ_VERSION +except ImportError: + raise DidNotEnable("Arq is not installed") + + +class ArqIntegration(Integration): + identifier = "arq" + + @staticmethod + def setup_once(): + # type: () -> None + + try: + if isinstance(ARQ_VERSION, str): + version = tuple(map(int, ARQ_VERSION.split(".")[:2])) + else: + version = ARQ_VERSION.version[:2] + except (TypeError, ValueError): + raise DidNotEnable("arq version unparsable: {}".format(ARQ_VERSION)) + + if version < (0, 23): + raise DidNotEnable("arq 0.23 or newer required.") diff --git a/setup.py b/setup.py index 907158dfbb..3d8e392e38 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ def get_file_text(file_name): "celery": ["celery>=3"], "huey": ["huey>=2"], "beam": ["apache-beam>=2.12"], + "arq": ["arq>=0.23"], "rq": ["rq>=0.6"], "aiohttp": ["aiohttp>=3.5"], "tornado": ["tornado>=5"], diff --git a/tests/integrations/arq/__init__.py b/tests/integrations/arq/__init__.py new file mode 100644 index 0000000000..f0b4712255 --- /dev/null +++ b/tests/integrations/arq/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("arq") diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py new file mode 100644 index 0000000000..685ffe529b --- /dev/null +++ b/tests/integrations/arq/test_arq.py @@ -0,0 +1,40 @@ +import pytest + +from sentry_sdk.integrations.arq import ArqIntegration + +from arq.connections import ArqRedis +from arq.worker import Worker + +from fakeredis.aioredis import FakeRedis + + +@pytest.fixture(autouse=True) +def patch_fakeredis_info_command(): + from fakeredis._fakesocket import FakeSocket + + if not hasattr(FakeSocket, "info"): + from fakeredis._commands import command + from fakeredis._helpers import SimpleString + + @command((SimpleString,), name="info") + def info(self, section): + return section + + FakeSocket.info = info + + +@pytest.fixture +def init_arq(sentry_init): + def inner(functions): + sentry_init( + integrations=[ArqIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + debug=True, + ) + + server = FakeRedis() + pool = ArqRedis(pool_or_conn=server.connection_pool) + return pool, Worker(functions, redis_pool=pool) + + return inner diff --git a/tox.ini b/tox.ini index cda2e6ccf6..8c62c20a7f 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,9 @@ envlist = {py3.7}-aiohttp-v{3.5} {py3.7,py3.8,py3.9,py3.10,py3.11}-aiohttp-v{3.6} + # Arq + {py3.7,py3.8,py3.9,py3.10,py3.11}-arq + # Asgi {py3.7,py3.8,py3.9,py3.10,py3.11}-asgi @@ -79,9 +82,9 @@ envlist = # HTTPX {py3.6,py3.7,py3.8,py3.9,py3.10,py3.11}-httpx-v{0.16,0.17} - + # Huey - {py2.7,py3.5,py3.6,py3.7,py3.8,py3.9,py3.10,py3.11}-huey-2 + {py2.7,py3.5,py3.6,py3.7,py3.8,py3.9,py3.10,py3.11}-huey-2 # OpenTelemetry (OTel) {py3.7,py3.8,py3.9,py3.10,py3.11}-opentelemetry @@ -159,6 +162,10 @@ deps = aiohttp-v3.5: aiohttp>=3.5.0,<3.6.0 aiohttp: pytest-aiohttp + # Arq + arq: arq>=0.23.0 + arq: fakeredis>=2.2.0 + # Asgi asgi: pytest-asyncio asgi: async-asgi-testclient @@ -264,10 +271,10 @@ deps = # HTTPX httpx-v0.16: httpx>=0.16,<0.17 httpx-v0.17: httpx>=0.17,<0.18 - + # Huey huey-2: huey>=2.0 - + # OpenTelemetry (OTel) opentelemetry: opentelemetry-distro @@ -376,6 +383,7 @@ setenv = PYTHONDONTWRITEBYTECODE=1 TESTPATH=tests aiohttp: TESTPATH=tests/integrations/aiohttp + arq: TESTPATH=tests/integrations/arq asgi: TESTPATH=tests/integrations/asgi aws_lambda: TESTPATH=tests/integrations/aws_lambda beam: TESTPATH=tests/integrations/beam From 2d154fdb13d97e926b6c57f0519d9dac35558c84 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Fri, 27 Jan 2023 21:06:08 +0300 Subject: [PATCH 02/14] Patch enqueue_job --- sentry_sdk/consts.py | 2 ++ sentry_sdk/integrations/arq.py | 33 +++++++++++++++++++++ tests/integrations/arq/test_arq.py | 46 ++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index b2d1ae26c7..6398cb3a80 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -69,6 +69,8 @@ class OP: MIDDLEWARE_STARLITE = "middleware.starlite" MIDDLEWARE_STARLITE_RECEIVE = "middleware.starlite.receive" MIDDLEWARE_STARLITE_SEND = "middleware.starlite.send" + QUEUE_SUBMIT_ARQ = "queue.submit.arq" + QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" QUEUE_TASK_CELERY = "queue.task.celery" QUEUE_TASK_RQ = "queue.task.rq" diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 351d5145fb..7dbff695e6 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,12 +1,24 @@ from __future__ import absolute_import +from sentry_sdk._types import MYPY +from sentry_sdk import Hub +from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.integrations.logging import ignore_logger try: from arq.version import VERSION as ARQ_VERSION + from arq.connections import ArqRedis + from arq.jobs import Job + from arq.worker import JobExecutionFailed, Retry, RetryJob except ImportError: raise DidNotEnable("Arq is not installed") +if MYPY: + from typing import Any, Optional + +ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) + class ArqIntegration(Integration): identifier = "arq" @@ -25,3 +37,24 @@ def setup_once(): if version < (0, 23): raise DidNotEnable("arq 0.23 or newer required.") + + patch_enqueue_job() + + ignore_logger("arq.worker") + + +def patch_enqueue_job(): + # type: () -> None + old_enqueue_job = ArqRedis.enqueue_job + + async def _sentry_enqueue_job(self, function, *args, **kwargs): + # type: (ArqRedis, str, *Any, **Any) -> Optional[Job] + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return await old_enqueue_job(self, function, *args, **kwargs) + + with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): + return await old_enqueue_job(self, function, *args, **kwargs) + + ArqRedis.enqueue_job = _sentry_enqueue_job diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 685ffe529b..6797c90514 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -1,8 +1,11 @@ import pytest +from sentry_sdk import start_transaction from sentry_sdk.integrations.arq import ArqIntegration from arq.connections import ArqRedis +from arq.jobs import Job +from arq.utils import timestamp_ms from arq.worker import Worker from fakeredis.aioredis import FakeRedis @@ -38,3 +41,46 @@ def inner(functions): return pool, Worker(functions, redis_pool=pool) return inner + + +@pytest.mark.asyncio +async def test_job_result(init_arq): + async def increase(ctx, num): + return num + 1 + + increase.__qualname__ = increase.__name__ + + pool, worker = init_arq([increase]) + + job = await pool.enqueue_job("increase", 3) + + assert isinstance(job, Job) + + await worker.run_job(job.job_id, timestamp_ms()) + result = await job.result() + job_result = await job.result_info() + + assert result == 4 + assert job_result.result == 4 + + +@pytest.mark.asyncio +async def test_enqueue_job(capture_events, init_arq): + async def dummy_job(_): + pass + + pool, _ = init_arq([dummy_job]) + + events = capture_events() + + with start_transaction() as transaction: + await pool.enqueue_job("dummy_job") + + (event,) = events + + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert event["contexts"]["trace"]["span_id"] == transaction.span_id + + assert len(event["spans"]) + assert event["spans"][0]["op"] == "queue.submit.arq" + assert event["spans"][0]["description"] == "dummy_job" From c51fca883a58d4adbfe336ac31f242fb7a004ef7 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 13:21:16 +0300 Subject: [PATCH 03/14] Patch run_job --- sentry_sdk/integrations/arq.py | 115 ++++++++++++++++++++++++++++- tests/integrations/arq/test_arq.py | 39 +++++++++- 2 files changed, 148 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 7dbff695e6..130c66939e 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -2,21 +2,30 @@ from sentry_sdk._types import MYPY from sentry_sdk import Hub -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE +from sentry_sdk.hub import _should_send_default_pii from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK +from sentry_sdk.utils import capture_internal_exceptions try: + import arq.worker from arq.version import VERSION as ARQ_VERSION from arq.connections import ArqRedis - from arq.jobs import Job - from arq.worker import JobExecutionFailed, Retry, RetryJob + from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker except ImportError: raise DidNotEnable("Arq is not installed") if MYPY: from typing import Any, Optional + from sentry_sdk._types import EventProcessor, Event, Hint + + from arq.jobs import Job + from arq.typing import WorkerCoroutine + from arq.worker import Function + ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) @@ -39,6 +48,8 @@ def setup_once(): raise DidNotEnable("arq 0.23 or newer required.") patch_enqueue_job() + patch_run_job() + patch_func() ignore_logger("arq.worker") @@ -58,3 +69,101 @@ async def _sentry_enqueue_job(self, function, *args, **kwargs): return await old_enqueue_job(self, function, *args, **kwargs) ArqRedis.enqueue_job = _sentry_enqueue_job + + +def patch_run_job(): + # type: () -> None + old_run_job = Worker.run_job + + async def _sentry_run_job(self, job_id, score): + # type: (Worker, str, int) -> None + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return await old_run_job(self, job_id, score) + + with hub.push_scope() as scope: + scope._name = "arq" + scope.clear_breadcrumbs() + + transaction = Transaction( + name="unknown arq task", + status="ok", + op=OP.QUEUE_TASK_ARQ, + source=TRANSACTION_SOURCE_TASK, + ) + + with hub.start_transaction(transaction): + return await old_run_job(self, job_id, score) + + Worker.run_job = _sentry_run_job + + +def _make_event_processor(ctx, *args, **kwargs): + # type: (dict, *Any, **Any) -> EventProcessor + def event_processor(event, hint): + # type: (Event, Hint) -> Optional[Event] + + hub = Hub.current + + with capture_internal_exceptions(): + if hub.scope.transaction is not None: + hub.scope.transaction.name = ctx["job_name"] + event["transaction"] = ctx["job_name"] + + tags = event.setdefault("tags", {}) + tags["arq_task_id"] = ctx["job_id"] + tags["arq_task_retry"] = ctx["job_try"] > 1 + extra = event.setdefault("extra", {}) + extra["arq-job"] = { + "task": ctx["job_name"], + "args": args + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "kwargs": kwargs + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "retry": ctx["job_try"], + } + + return event + + return event_processor + + +def _wrap_coroutine(name, coroutine): + # type: (str, WorkerCoroutine) -> WorkerCoroutine + async def _sentry_coroutine(ctx, *args, **kwargs): + # type: (dict, *Any, **Any) -> Any + hub = Hub.current + if hub.get_integration(ArqIntegration) is None: + return await coroutine(*args, **kwargs) + + hub.scope.add_event_processor( + _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) + ) + + return await coroutine(ctx, *args, **kwargs) + + return _sentry_coroutine + + +def patch_func(): + old_func = arq.worker.func + + def _sentry_func(*args, **kwargs): + # type: (*Any, **Any) -> Function + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return old_func(*args, **kwargs) + + func = old_func(*args, **kwargs) + + if not getattr(func, "_sentry_is_patched", False): + func.coroutine = _wrap_coroutine(func.name, func.coroutine) + func._sentry_is_patched = True + + return func + + arq.worker.func = _sentry_func diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 6797c90514..7f77c0f82d 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -6,7 +6,7 @@ from arq.connections import ArqRedis from arq.jobs import Job from arq.utils import timestamp_ms -from arq.worker import Worker +from arq.worker import Retry, Worker from fakeredis.aioredis import FakeRedis @@ -28,7 +28,7 @@ def info(self, section): @pytest.fixture def init_arq(sentry_init): - def inner(functions): + def inner(functions, allow_abort_jobs=False): sentry_init( integrations=[ArqIntegration()], traces_sample_rate=1.0, @@ -38,7 +38,9 @@ def inner(functions): server = FakeRedis() pool = ArqRedis(pool_or_conn=server.connection_pool) - return pool, Worker(functions, redis_pool=pool) + return pool, Worker( + functions, redis_pool=pool, allow_abort_jobs=allow_abort_jobs + ) return inner @@ -64,6 +66,37 @@ async def increase(ctx, num): assert job_result.result == 4 +@pytest.mark.asyncio +async def test_job_retry(capture_events, init_arq): + async def retry_job(ctx): + if ctx["job_try"] < 2: + raise Retry + + retry_job.__qualname__ = retry_job.__name__ + + pool, worker = init_arq([retry_job]) + + job = await pool.enqueue_job("retry_job") + + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "aborted" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 1 + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "ok" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 2 + + @pytest.mark.asyncio async def test_enqueue_job(capture_events, init_arq): async def dummy_job(_): From e3600ba657b3f0d0d203e3e6912257d75233e731 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 20:25:38 +0300 Subject: [PATCH 04/14] Capture exceptions --- sentry_sdk/integrations/arq.py | 33 ++++++++++++++++++++++-- tests/integrations/arq/test_arq.py | 40 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 130c66939e..0a17655491 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import sys + +from sentry_sdk._compat import reraise from sentry_sdk._types import MYPY from sentry_sdk import Hub from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE @@ -7,7 +10,7 @@ from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK -from sentry_sdk.utils import capture_internal_exceptions +from sentry_sdk.utils import capture_internal_exceptions, event_from_exception try: import arq.worker @@ -99,6 +102,25 @@ async def _sentry_run_job(self, job_id, score): Worker.run_job = _sentry_run_job +def _capture_exception(exc_info): + # (ExcInfo) -> None + hub = Hub.current + + if hub.scope.transaction is not None: + if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: + hub.scope.transaction.set_status("aborted") + return + + hub.scope.transaction.set_status("internal_error") + + event, hint = event_from_exception( + exc_info, + client_options=hub.client.options if hub.client else None, + mechanism={"type": ArqIntegration.identifier, "handled": False}, + ) + hub.capture_event(event, hint=hint) + + def _make_event_processor(ctx, *args, **kwargs): # type: (dict, *Any, **Any) -> EventProcessor def event_processor(event, hint): @@ -143,7 +165,14 @@ async def _sentry_coroutine(ctx, *args, **kwargs): _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) ) - return await coroutine(ctx, *args, **kwargs) + try: + result = await coroutine(ctx, *args, **kwargs) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result return _sentry_coroutine diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 7f77c0f82d..d7e0e8af85 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -97,6 +97,46 @@ async def retry_job(ctx): assert event["extra"]["arq-job"]["retry"] == 2 +@pytest.mark.parametrize("job_fails", [True, False], ids=["error", "success"]) +@pytest.mark.asyncio +async def test_job_transaction(capture_events, init_arq, job_fails): + async def division(_, a, b=0): + return a / b + + division.__qualname__ = division.__name__ + + pool, worker = init_arq([division]) + + events = capture_events() + + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + await worker.run_job(job.job_id, timestamp_ms()) + + if job_fails: + error_event = events.pop(0) + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert error_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "division" + assert event["transaction_info"] == {"source": "task"} + + if job_fails: + assert event["contexts"]["trace"]["status"] == "internal_error" + else: + assert event["contexts"]["trace"]["status"] == "ok" + + assert "arq_task_id" in event["tags"] + assert "arq_task_retry" in event["tags"] + + extra = event["extra"]["arq-job"] + assert extra["task"] == "division" + assert extra["args"] == [1] + assert extra["kwargs"] == {"b": int(not job_fails)} + assert extra["retry"] == 1 + + @pytest.mark.asyncio async def test_enqueue_job(capture_events, init_arq): async def dummy_job(_): From 250aa4fed92530473109773f993af69f8232c0e6 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 21:03:01 +0300 Subject: [PATCH 05/14] Formatting --- mypy.ini | 2 ++ sentry_sdk/integrations/arq.py | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/mypy.ini b/mypy.ini index 6e8f6b7230..0d12e43280 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,3 +65,5 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-huey.*] ignore_missing_imports = True +[mypy-arq.*] +ignore_missing_imports = True diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 0a17655491..ea39aa1129 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -21,9 +21,9 @@ raise DidNotEnable("Arq is not installed") if MYPY: - from typing import Any, Optional + from typing import Any, Dict, Optional - from sentry_sdk._types import EventProcessor, Event, Hint + from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint from arq.jobs import Job from arq.typing import WorkerCoroutine @@ -103,7 +103,7 @@ async def _sentry_run_job(self, job_id, score): def _capture_exception(exc_info): - # (ExcInfo) -> None + # type: (ExcInfo) -> None hub = Hub.current if hub.scope.transaction is not None: @@ -122,7 +122,7 @@ def _capture_exception(exc_info): def _make_event_processor(ctx, *args, **kwargs): - # type: (dict, *Any, **Any) -> EventProcessor + # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor def event_processor(event, hint): # type: (Event, Hint) -> Optional[Event] @@ -156,7 +156,7 @@ def event_processor(event, hint): def _wrap_coroutine(name, coroutine): # type: (str, WorkerCoroutine) -> WorkerCoroutine async def _sentry_coroutine(ctx, *args, **kwargs): - # type: (dict, *Any, **Any) -> Any + # type: (Dict[Any, Any], *Any, **Any) -> Any hub = Hub.current if hub.get_integration(ArqIntegration) is None: return await coroutine(*args, **kwargs) @@ -178,6 +178,7 @@ async def _sentry_coroutine(ctx, *args, **kwargs): def patch_func(): + # type: () -> None old_func = arq.worker.func def _sentry_func(*args, **kwargs): From 08d5c9d9bb862a2571bc401abc54525807ccc50d Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 22 Feb 2023 09:32:46 +0100 Subject: [PATCH 06/14] Initial arq setup --- .github/workflows/test-integration-arq.yml | 73 ++++++++++++++++++++++ sentry_sdk/integrations/arq.py | 27 ++++++++ setup.py | 1 + tests/integrations/arq/__init__.py | 3 + tests/integrations/arq/test_arq.py | 40 ++++++++++++ tox.ini | 8 +++ 6 files changed, 152 insertions(+) create mode 100644 .github/workflows/test-integration-arq.yml create mode 100644 sentry_sdk/integrations/arq.py create mode 100644 tests/integrations/arq/__init__.py create mode 100644 tests/integrations/arq/test_arq.py diff --git a/.github/workflows/test-integration-arq.yml b/.github/workflows/test-integration-arq.yml new file mode 100644 index 0000000000..2eee836bc1 --- /dev/null +++ b/.github/workflows/test-integration-arq.yml @@ -0,0 +1,73 @@ +name: Test arq + +on: + push: + branches: + - master + - release/** + + pull_request: + +# Cancel in progress workflows on pull_requests. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +permissions: + contents: read + +env: + BUILD_CACHE_KEY: ${{ github.sha }} + CACHED_BUILD_PATHS: | + ${{ github.workspace }}/dist-serverless + +jobs: + test: + name: arq, python ${{ matrix.python-version }}, ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 45 + + strategy: + fail-fast: false + matrix: + python-version: ["3.7","3.8","3.9","3.10","3.11"] + # python3.6 reached EOL and is no longer being supported on + # new versions of hosted runners on Github Actions + # ubuntu-20.04 is the last version that supported python3.6 + # see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877 + os: [ubuntu-20.04] + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup Test Env + run: | + pip install codecov "tox>=3,<4" + + - name: Test arq + timeout-minutes: 45 + shell: bash + run: | + set -x # print commands that are executed + coverage erase + + ./scripts/runtox.sh "${{ matrix.python-version }}-arq" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch + coverage combine .coverage* + coverage xml -i + codecov --file coverage.xml + + check_required_tests: + name: All arq tests passed or skipped + needs: test + # Always run this, even if a dependent job failed + if: always() + runs-on: ubuntu-20.04 + steps: + - name: Check for failures + if: contains(needs.test.result, 'failure') + run: | + echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1 diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py new file mode 100644 index 0000000000..351d5145fb --- /dev/null +++ b/sentry_sdk/integrations/arq.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import + +from sentry_sdk.integrations import DidNotEnable, Integration + +try: + from arq.version import VERSION as ARQ_VERSION +except ImportError: + raise DidNotEnable("Arq is not installed") + + +class ArqIntegration(Integration): + identifier = "arq" + + @staticmethod + def setup_once(): + # type: () -> None + + try: + if isinstance(ARQ_VERSION, str): + version = tuple(map(int, ARQ_VERSION.split(".")[:2])) + else: + version = ARQ_VERSION.version[:2] + except (TypeError, ValueError): + raise DidNotEnable("arq version unparsable: {}".format(ARQ_VERSION)) + + if version < (0, 23): + raise DidNotEnable("arq 0.23 or newer required.") diff --git a/setup.py b/setup.py index 07756acabc..3a96380a11 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ def get_file_text(file_name): "celery": ["celery>=3"], "huey": ["huey>=2"], "beam": ["apache-beam>=2.12"], + "arq": ["arq>=0.23"], "rq": ["rq>=0.6"], "aiohttp": ["aiohttp>=3.5"], "tornado": ["tornado>=5"], diff --git a/tests/integrations/arq/__init__.py b/tests/integrations/arq/__init__.py new file mode 100644 index 0000000000..f0b4712255 --- /dev/null +++ b/tests/integrations/arq/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("arq") diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py new file mode 100644 index 0000000000..685ffe529b --- /dev/null +++ b/tests/integrations/arq/test_arq.py @@ -0,0 +1,40 @@ +import pytest + +from sentry_sdk.integrations.arq import ArqIntegration + +from arq.connections import ArqRedis +from arq.worker import Worker + +from fakeredis.aioredis import FakeRedis + + +@pytest.fixture(autouse=True) +def patch_fakeredis_info_command(): + from fakeredis._fakesocket import FakeSocket + + if not hasattr(FakeSocket, "info"): + from fakeredis._commands import command + from fakeredis._helpers import SimpleString + + @command((SimpleString,), name="info") + def info(self, section): + return section + + FakeSocket.info = info + + +@pytest.fixture +def init_arq(sentry_init): + def inner(functions): + sentry_init( + integrations=[ArqIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + debug=True, + ) + + server = FakeRedis() + pool = ArqRedis(pool_or_conn=server.connection_pool) + return pool, Worker(functions, redis_pool=pool) + + return inner diff --git a/tox.ini b/tox.ini index 55af0dfd8c..63dee1cce2 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,9 @@ envlist = {py3.7}-aiohttp-v{3.5} {py3.7,py3.8,py3.9,py3.10,py3.11}-aiohttp-v{3.6} + # Arq + {py3.7,py3.8,py3.9,py3.10,py3.11}-arq + # Asgi {py3.7,py3.8,py3.9,py3.10,py3.11}-asgi @@ -175,6 +178,10 @@ deps = aiohttp-v3.5: aiohttp>=3.5.0,<3.6.0 aiohttp: pytest-aiohttp + # Arq + arq: arq>=0.23.0 + arq: fakeredis>=2.2.0 + # Asgi asgi: pytest-asyncio asgi: async-asgi-testclient @@ -400,6 +407,7 @@ setenv = PYTHONDONTWRITEBYTECODE=1 TESTPATH=tests aiohttp: TESTPATH=tests/integrations/aiohttp + arq: TESTPATH=tests/integrations/arq asgi: TESTPATH=tests/integrations/asgi aws_lambda: TESTPATH=tests/integrations/aws_lambda beam: TESTPATH=tests/integrations/beam From 600681fe6415d14e8cfd0696adf20dce6a7adce6 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Fri, 27 Jan 2023 21:06:08 +0300 Subject: [PATCH 07/14] Patch enqueue_job --- sentry_sdk/consts.py | 2 ++ sentry_sdk/integrations/arq.py | 33 +++++++++++++++++++++ tests/integrations/arq/test_arq.py | 46 ++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 2d2b28b9ee..d5c9b19a45 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -65,6 +65,8 @@ class OP: MIDDLEWARE_STARLITE = "middleware.starlite" MIDDLEWARE_STARLITE_RECEIVE = "middleware.starlite.receive" MIDDLEWARE_STARLITE_SEND = "middleware.starlite.send" + QUEUE_SUBMIT_ARQ = "queue.submit.arq" + QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" QUEUE_TASK_CELERY = "queue.task.celery" QUEUE_TASK_RQ = "queue.task.rq" diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 351d5145fb..7dbff695e6 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,12 +1,24 @@ from __future__ import absolute_import +from sentry_sdk._types import MYPY +from sentry_sdk import Hub +from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.integrations.logging import ignore_logger try: from arq.version import VERSION as ARQ_VERSION + from arq.connections import ArqRedis + from arq.jobs import Job + from arq.worker import JobExecutionFailed, Retry, RetryJob except ImportError: raise DidNotEnable("Arq is not installed") +if MYPY: + from typing import Any, Optional + +ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) + class ArqIntegration(Integration): identifier = "arq" @@ -25,3 +37,24 @@ def setup_once(): if version < (0, 23): raise DidNotEnable("arq 0.23 or newer required.") + + patch_enqueue_job() + + ignore_logger("arq.worker") + + +def patch_enqueue_job(): + # type: () -> None + old_enqueue_job = ArqRedis.enqueue_job + + async def _sentry_enqueue_job(self, function, *args, **kwargs): + # type: (ArqRedis, str, *Any, **Any) -> Optional[Job] + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return await old_enqueue_job(self, function, *args, **kwargs) + + with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): + return await old_enqueue_job(self, function, *args, **kwargs) + + ArqRedis.enqueue_job = _sentry_enqueue_job diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 685ffe529b..6797c90514 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -1,8 +1,11 @@ import pytest +from sentry_sdk import start_transaction from sentry_sdk.integrations.arq import ArqIntegration from arq.connections import ArqRedis +from arq.jobs import Job +from arq.utils import timestamp_ms from arq.worker import Worker from fakeredis.aioredis import FakeRedis @@ -38,3 +41,46 @@ def inner(functions): return pool, Worker(functions, redis_pool=pool) return inner + + +@pytest.mark.asyncio +async def test_job_result(init_arq): + async def increase(ctx, num): + return num + 1 + + increase.__qualname__ = increase.__name__ + + pool, worker = init_arq([increase]) + + job = await pool.enqueue_job("increase", 3) + + assert isinstance(job, Job) + + await worker.run_job(job.job_id, timestamp_ms()) + result = await job.result() + job_result = await job.result_info() + + assert result == 4 + assert job_result.result == 4 + + +@pytest.mark.asyncio +async def test_enqueue_job(capture_events, init_arq): + async def dummy_job(_): + pass + + pool, _ = init_arq([dummy_job]) + + events = capture_events() + + with start_transaction() as transaction: + await pool.enqueue_job("dummy_job") + + (event,) = events + + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert event["contexts"]["trace"]["span_id"] == transaction.span_id + + assert len(event["spans"]) + assert event["spans"][0]["op"] == "queue.submit.arq" + assert event["spans"][0]["description"] == "dummy_job" From cc8d7409af92a78d36db7b19a93a5f5fed3d101b Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 13:21:16 +0300 Subject: [PATCH 08/14] Patch run_job --- sentry_sdk/integrations/arq.py | 115 ++++++++++++++++++++++++++++- tests/integrations/arq/test_arq.py | 39 +++++++++- 2 files changed, 148 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 7dbff695e6..130c66939e 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -2,21 +2,30 @@ from sentry_sdk._types import MYPY from sentry_sdk import Hub -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE +from sentry_sdk.hub import _should_send_default_pii from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK +from sentry_sdk.utils import capture_internal_exceptions try: + import arq.worker from arq.version import VERSION as ARQ_VERSION from arq.connections import ArqRedis - from arq.jobs import Job - from arq.worker import JobExecutionFailed, Retry, RetryJob + from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker except ImportError: raise DidNotEnable("Arq is not installed") if MYPY: from typing import Any, Optional + from sentry_sdk._types import EventProcessor, Event, Hint + + from arq.jobs import Job + from arq.typing import WorkerCoroutine + from arq.worker import Function + ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) @@ -39,6 +48,8 @@ def setup_once(): raise DidNotEnable("arq 0.23 or newer required.") patch_enqueue_job() + patch_run_job() + patch_func() ignore_logger("arq.worker") @@ -58,3 +69,101 @@ async def _sentry_enqueue_job(self, function, *args, **kwargs): return await old_enqueue_job(self, function, *args, **kwargs) ArqRedis.enqueue_job = _sentry_enqueue_job + + +def patch_run_job(): + # type: () -> None + old_run_job = Worker.run_job + + async def _sentry_run_job(self, job_id, score): + # type: (Worker, str, int) -> None + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return await old_run_job(self, job_id, score) + + with hub.push_scope() as scope: + scope._name = "arq" + scope.clear_breadcrumbs() + + transaction = Transaction( + name="unknown arq task", + status="ok", + op=OP.QUEUE_TASK_ARQ, + source=TRANSACTION_SOURCE_TASK, + ) + + with hub.start_transaction(transaction): + return await old_run_job(self, job_id, score) + + Worker.run_job = _sentry_run_job + + +def _make_event_processor(ctx, *args, **kwargs): + # type: (dict, *Any, **Any) -> EventProcessor + def event_processor(event, hint): + # type: (Event, Hint) -> Optional[Event] + + hub = Hub.current + + with capture_internal_exceptions(): + if hub.scope.transaction is not None: + hub.scope.transaction.name = ctx["job_name"] + event["transaction"] = ctx["job_name"] + + tags = event.setdefault("tags", {}) + tags["arq_task_id"] = ctx["job_id"] + tags["arq_task_retry"] = ctx["job_try"] > 1 + extra = event.setdefault("extra", {}) + extra["arq-job"] = { + "task": ctx["job_name"], + "args": args + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "kwargs": kwargs + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "retry": ctx["job_try"], + } + + return event + + return event_processor + + +def _wrap_coroutine(name, coroutine): + # type: (str, WorkerCoroutine) -> WorkerCoroutine + async def _sentry_coroutine(ctx, *args, **kwargs): + # type: (dict, *Any, **Any) -> Any + hub = Hub.current + if hub.get_integration(ArqIntegration) is None: + return await coroutine(*args, **kwargs) + + hub.scope.add_event_processor( + _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) + ) + + return await coroutine(ctx, *args, **kwargs) + + return _sentry_coroutine + + +def patch_func(): + old_func = arq.worker.func + + def _sentry_func(*args, **kwargs): + # type: (*Any, **Any) -> Function + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return old_func(*args, **kwargs) + + func = old_func(*args, **kwargs) + + if not getattr(func, "_sentry_is_patched", False): + func.coroutine = _wrap_coroutine(func.name, func.coroutine) + func._sentry_is_patched = True + + return func + + arq.worker.func = _sentry_func diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 6797c90514..7f77c0f82d 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -6,7 +6,7 @@ from arq.connections import ArqRedis from arq.jobs import Job from arq.utils import timestamp_ms -from arq.worker import Worker +from arq.worker import Retry, Worker from fakeredis.aioredis import FakeRedis @@ -28,7 +28,7 @@ def info(self, section): @pytest.fixture def init_arq(sentry_init): - def inner(functions): + def inner(functions, allow_abort_jobs=False): sentry_init( integrations=[ArqIntegration()], traces_sample_rate=1.0, @@ -38,7 +38,9 @@ def inner(functions): server = FakeRedis() pool = ArqRedis(pool_or_conn=server.connection_pool) - return pool, Worker(functions, redis_pool=pool) + return pool, Worker( + functions, redis_pool=pool, allow_abort_jobs=allow_abort_jobs + ) return inner @@ -64,6 +66,37 @@ async def increase(ctx, num): assert job_result.result == 4 +@pytest.mark.asyncio +async def test_job_retry(capture_events, init_arq): + async def retry_job(ctx): + if ctx["job_try"] < 2: + raise Retry + + retry_job.__qualname__ = retry_job.__name__ + + pool, worker = init_arq([retry_job]) + + job = await pool.enqueue_job("retry_job") + + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "aborted" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 1 + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "ok" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 2 + + @pytest.mark.asyncio async def test_enqueue_job(capture_events, init_arq): async def dummy_job(_): From b46cd8a621f1ca8fe26d2c1aea036408d4908ed7 Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 20:25:38 +0300 Subject: [PATCH 09/14] Capture exceptions --- sentry_sdk/integrations/arq.py | 33 ++++++++++++++++++++++-- tests/integrations/arq/test_arq.py | 40 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 130c66939e..0a17655491 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import sys + +from sentry_sdk._compat import reraise from sentry_sdk._types import MYPY from sentry_sdk import Hub from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE @@ -7,7 +10,7 @@ from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK -from sentry_sdk.utils import capture_internal_exceptions +from sentry_sdk.utils import capture_internal_exceptions, event_from_exception try: import arq.worker @@ -99,6 +102,25 @@ async def _sentry_run_job(self, job_id, score): Worker.run_job = _sentry_run_job +def _capture_exception(exc_info): + # (ExcInfo) -> None + hub = Hub.current + + if hub.scope.transaction is not None: + if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: + hub.scope.transaction.set_status("aborted") + return + + hub.scope.transaction.set_status("internal_error") + + event, hint = event_from_exception( + exc_info, + client_options=hub.client.options if hub.client else None, + mechanism={"type": ArqIntegration.identifier, "handled": False}, + ) + hub.capture_event(event, hint=hint) + + def _make_event_processor(ctx, *args, **kwargs): # type: (dict, *Any, **Any) -> EventProcessor def event_processor(event, hint): @@ -143,7 +165,14 @@ async def _sentry_coroutine(ctx, *args, **kwargs): _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) ) - return await coroutine(ctx, *args, **kwargs) + try: + result = await coroutine(ctx, *args, **kwargs) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result return _sentry_coroutine diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 7f77c0f82d..d7e0e8af85 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -97,6 +97,46 @@ async def retry_job(ctx): assert event["extra"]["arq-job"]["retry"] == 2 +@pytest.mark.parametrize("job_fails", [True, False], ids=["error", "success"]) +@pytest.mark.asyncio +async def test_job_transaction(capture_events, init_arq, job_fails): + async def division(_, a, b=0): + return a / b + + division.__qualname__ = division.__name__ + + pool, worker = init_arq([division]) + + events = capture_events() + + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + await worker.run_job(job.job_id, timestamp_ms()) + + if job_fails: + error_event = events.pop(0) + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert error_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "division" + assert event["transaction_info"] == {"source": "task"} + + if job_fails: + assert event["contexts"]["trace"]["status"] == "internal_error" + else: + assert event["contexts"]["trace"]["status"] == "ok" + + assert "arq_task_id" in event["tags"] + assert "arq_task_retry" in event["tags"] + + extra = event["extra"]["arq-job"] + assert extra["task"] == "division" + assert extra["args"] == [1] + assert extra["kwargs"] == {"b": int(not job_fails)} + assert extra["retry"] == 1 + + @pytest.mark.asyncio async def test_enqueue_job(capture_events, init_arq): async def dummy_job(_): From 10e83ba1cbc4836afd4663e93009f9934221c7da Mon Sep 17 00:00:00 2001 From: Zhenay Date: Sat, 28 Jan 2023 21:03:01 +0300 Subject: [PATCH 10/14] Formatting --- mypy.ini | 2 ++ sentry_sdk/integrations/arq.py | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/mypy.ini b/mypy.ini index 6e8f6b7230..0d12e43280 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,3 +65,5 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-huey.*] ignore_missing_imports = True +[mypy-arq.*] +ignore_missing_imports = True diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 0a17655491..ea39aa1129 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -21,9 +21,9 @@ raise DidNotEnable("Arq is not installed") if MYPY: - from typing import Any, Optional + from typing import Any, Dict, Optional - from sentry_sdk._types import EventProcessor, Event, Hint + from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint from arq.jobs import Job from arq.typing import WorkerCoroutine @@ -103,7 +103,7 @@ async def _sentry_run_job(self, job_id, score): def _capture_exception(exc_info): - # (ExcInfo) -> None + # type: (ExcInfo) -> None hub = Hub.current if hub.scope.transaction is not None: @@ -122,7 +122,7 @@ def _capture_exception(exc_info): def _make_event_processor(ctx, *args, **kwargs): - # type: (dict, *Any, **Any) -> EventProcessor + # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor def event_processor(event, hint): # type: (Event, Hint) -> Optional[Event] @@ -156,7 +156,7 @@ def event_processor(event, hint): def _wrap_coroutine(name, coroutine): # type: (str, WorkerCoroutine) -> WorkerCoroutine async def _sentry_coroutine(ctx, *args, **kwargs): - # type: (dict, *Any, **Any) -> Any + # type: (Dict[Any, Any], *Any, **Any) -> Any hub = Hub.current if hub.get_integration(ArqIntegration) is None: return await coroutine(*args, **kwargs) @@ -178,6 +178,7 @@ async def _sentry_coroutine(ctx, *args, **kwargs): def patch_func(): + # type: () -> None old_func = arq.worker.func def _sentry_func(*args, **kwargs): From 808f3e2f37bc9a74c5b334eb968eb1c7f2876f22 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 22 Feb 2023 09:38:55 +0100 Subject: [PATCH 11/14] Linting and fixing import --- sentry_sdk/integrations/arq.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index ea39aa1129..9ca1dfb789 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -5,12 +5,16 @@ from sentry_sdk._compat import reraise from sentry_sdk._types import MYPY from sentry_sdk import Hub -from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE +from sentry_sdk.consts import OP from sentry_sdk.hub import _should_send_default_pii from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK -from sentry_sdk.utils import capture_internal_exceptions, event_from_exception +from sentry_sdk.utils import ( + capture_internal_exceptions, + event_from_exception, + SENSITIVE_DATA_SUBSTITUTE, +) try: import arq.worker From 987dd2945b9ee2839d30248b035e794e2d99abed Mon Sep 17 00:00:00 2001 From: Zhenay Date: Wed, 22 Feb 2023 16:48:42 +0300 Subject: [PATCH 12/14] Add missed requirement --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 63dee1cce2..8712769031 100644 --- a/tox.ini +++ b/tox.ini @@ -181,6 +181,7 @@ deps = # Arq arq: arq>=0.23.0 arq: fakeredis>=2.2.0 + arq: pytest-asyncio # Asgi asgi: pytest-asyncio From f7a2302905ad533b9439982da728e0b8a66dd0e5 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 22 Feb 2023 15:14:46 +0100 Subject: [PATCH 13/14] Fixing too many scopes being popped. --- sentry_sdk/integrations/arq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 9ca1dfb789..0fba2240f1 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -84,12 +84,12 @@ def patch_run_job(): async def _sentry_run_job(self, job_id, score): # type: (Worker, str, int) -> None - hub = Hub.current + hub = Hub(Hub.current) if hub.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score) - with hub.push_scope() as scope: + with hub.configure_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs() From 9e2de5f921174ef8689b3ab257b10b82b2eed045 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 22 Feb 2023 15:21:36 +0100 Subject: [PATCH 14/14] If we clone the hub we can also push the scope savely --- sentry_sdk/integrations/arq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 0fba2240f1..195272a4c7 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -89,7 +89,7 @@ async def _sentry_run_job(self, job_id, score): if hub.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score) - with hub.configure_scope() as scope: + with hub.push_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs()