Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/test-integration-arq.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Test arq

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless

jobs:
test:
name: arq, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 45

strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install codecov "tox>=3,<4"

- name: Test arq
timeout-minutes: 45
shell: bash
run: |
set -x # print commands that are executed
coverage erase

./scripts/runtox.sh "${{ matrix.python-version }}-arq" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
coverage combine .coverage*
coverage xml -i
codecov --file coverage.xml

check_required_tests:
name: All arq tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-huey.*]
ignore_missing_imports = True
[mypy-arq.*]
ignore_missing_imports = True
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class OP:
MIDDLEWARE_STARLITE = "middleware.starlite"
MIDDLEWARE_STARLITE_RECEIVE = "middleware.starlite.receive"
MIDDLEWARE_STARLITE_SEND = "middleware.starlite.send"
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
QUEUE_TASK_ARQ = "queue.task.arq"
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
QUEUE_TASK_CELERY = "queue.task.celery"
QUEUE_TASK_RQ = "queue.task.rq"
Expand Down
203 changes: 203 additions & 0 deletions sentry_sdk/integrations/arq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from __future__ import absolute_import

import sys

from sentry_sdk._compat import reraise
from sentry_sdk._types import MYPY
from sentry_sdk import Hub
from sentry_sdk.consts import OP
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
capture_internal_exceptions,
event_from_exception,
SENSITIVE_DATA_SUBSTITUTE,
)

try:
import arq.worker
from arq.version import VERSION as ARQ_VERSION
from arq.connections import ArqRedis
from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker
except ImportError:
raise DidNotEnable("Arq is not installed")

if MYPY:
from typing import Any, Dict, Optional

from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint

from arq.jobs import Job
from arq.typing import WorkerCoroutine
from arq.worker import Function

ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)


class ArqIntegration(Integration):
identifier = "arq"

@staticmethod
def setup_once():
# type: () -> None

try:
if isinstance(ARQ_VERSION, str):
version = tuple(map(int, ARQ_VERSION.split(".")[:2]))
else:
version = ARQ_VERSION.version[:2]
except (TypeError, ValueError):
raise DidNotEnable("arq version unparsable: {}".format(ARQ_VERSION))

if version < (0, 23):
raise DidNotEnable("arq 0.23 or newer required.")

patch_enqueue_job()
patch_run_job()
patch_func()

ignore_logger("arq.worker")


def patch_enqueue_job():
# type: () -> None
old_enqueue_job = ArqRedis.enqueue_job

async def _sentry_enqueue_job(self, function, *args, **kwargs):
# type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
hub = Hub.current

if hub.get_integration(ArqIntegration) is None:
return await old_enqueue_job(self, function, *args, **kwargs)

with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
return await old_enqueue_job(self, function, *args, **kwargs)

ArqRedis.enqueue_job = _sentry_enqueue_job


def patch_run_job():
# type: () -> None
old_run_job = Worker.run_job

async def _sentry_run_job(self, job_id, score):
# type: (Worker, str, int) -> None
hub = Hub(Hub.current)

if hub.get_integration(ArqIntegration) is None:
return await old_run_job(self, job_id, score)

with hub.push_scope() as scope:
scope._name = "arq"
scope.clear_breadcrumbs()

transaction = Transaction(
name="unknown arq task",
status="ok",
op=OP.QUEUE_TASK_ARQ,
source=TRANSACTION_SOURCE_TASK,
)

with hub.start_transaction(transaction):
return await old_run_job(self, job_id, score)

Worker.run_job = _sentry_run_job


def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current

if hub.scope.transaction is not None:
if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
hub.scope.transaction.set_status("aborted")
return

hub.scope.transaction.set_status("internal_error")

event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": ArqIntegration.identifier, "handled": False},
)
hub.capture_event(event, hint=hint)


def _make_event_processor(ctx, *args, **kwargs):
# type: (Dict[Any, Any], *Any, **Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]

hub = Hub.current

with capture_internal_exceptions():
if hub.scope.transaction is not None:
hub.scope.transaction.name = ctx["job_name"]
event["transaction"] = ctx["job_name"]

tags = event.setdefault("tags", {})
tags["arq_task_id"] = ctx["job_id"]
tags["arq_task_retry"] = ctx["job_try"] > 1
extra = event.setdefault("extra", {})
extra["arq-job"] = {
"task": ctx["job_name"],
"args": args
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"kwargs": kwargs
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"retry": ctx["job_try"],
}

return event

return event_processor


def _wrap_coroutine(name, coroutine):
# type: (str, WorkerCoroutine) -> WorkerCoroutine
async def _sentry_coroutine(ctx, *args, **kwargs):
# type: (Dict[Any, Any], *Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(ArqIntegration) is None:
return await coroutine(*args, **kwargs)

hub.scope.add_event_processor(
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
)

try:
result = await coroutine(ctx, *args, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

return _sentry_coroutine


def patch_func():
# type: () -> None
old_func = arq.worker.func

def _sentry_func(*args, **kwargs):
# type: (*Any, **Any) -> Function
hub = Hub.current

if hub.get_integration(ArqIntegration) is None:
return old_func(*args, **kwargs)

func = old_func(*args, **kwargs)

if not getattr(func, "_sentry_is_patched", False):
func.coroutine = _wrap_coroutine(func.name, func.coroutine)
func._sentry_is_patched = True

return func

arq.worker.func = _sentry_func
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def get_file_text(file_name):
"celery": ["celery>=3"],
"huey": ["huey>=2"],
"beam": ["apache-beam>=2.12"],
"arq": ["arq>=0.23"],
"rq": ["rq>=0.6"],
"aiohttp": ["aiohttp>=3.5"],
"tornado": ["tornado>=5"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/arq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("arq")
Loading