From 9f24136a4eaf618a683ed63a78498fb6995036e4 Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 14 Jul 2025 14:18:36 +0200 Subject: [PATCH 01/21] ref(transport): Add abstract base class for worker implementation Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation. GH-4578 --- sentry_sdk/worker.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..510376f381 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,4 +1,5 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading @@ -16,7 +17,48 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + pass + + @abstractmethod + def kill(self) -> None: + pass + + @abstractmethod + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + """ + pass + + @abstractmethod + def full(self) -> bool: + pass + + @abstractmethod + def submit(self, callback: Callable[[], None]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() From 001f36cbf4cf4f8c40ee49e318f88911eaba7fbd Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 14 Jul 2025 14:23:23 +0200 Subject: [PATCH 02/21] ref(transport): Add _create_worker factory method to Transport Add a new factory method instead of direct instatiation of the threaded background worker. This allows for easy extension to other types of workers, such as the upcoming task-based async worker. GH-4578 --- sentry_sdk/transport.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index d612028250..f8328cac12 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -28,7 +28,7 @@ from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import BackgroundWorker, Worker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: + # For now, we only support the threaded sync background worker. + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def record_lost_event( self: Self, reason: str, From 401b1bcedc0f92322234f23cb45bc2a49715aec1 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:55:26 +0200 Subject: [PATCH 03/21] ref(worker): Add flush_async method to Worker ABC Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a synchronous blocking flush. GH-4578 --- sentry_sdk/worker.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 510376f381..f37f920fe3 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -33,7 +33,6 @@ def is_alive(self) -> bool: def kill(self) -> None: pass - @abstractmethod def flush( self, timeout: float, callback: Optional[Callable[[int, float], None]] = None ) -> None: @@ -41,8 +40,22 @@ def flush( Flush the worker. This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. """ - pass + return None + + async def flush_async( + self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + ) -> None: + """ + Flush the worker. + + This method can be awaited until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None @abstractmethod def full(self) -> bool: From 3f43d8fc0f464a10d568d86bf94ee0ec1346a2ab Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 12:43:04 +0200 Subject: [PATCH 04/21] ref(worker): Move worker flush_async from Worker ABC Move the flush_async down to the concrete subclass to not break existing testing. This makes sense, as this will only really be needed by the async worker anyway and therefore is not shared logic. GH-4578 --- sentry_sdk/worker.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index f37f920fe3..200a9ea914 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -45,18 +45,6 @@ def flush( """ return None - async def flush_async( - self, timeout: float, callback: Optional[Callable[[int, float], None]] = None - ) -> None: - """ - Flush the worker. - - This method can be awaited until the worker has flushed all events or the specified timeout is reached. - Default implementation is a no-op, since this method may only be relevant to some workers. - Subclasses should override this method if necessary. - """ - return None - @abstractmethod def full(self) -> bool: pass From 15fa295611edd059d79f2a6e0aedeb5db1707ca2 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:28:16 +0200 Subject: [PATCH 05/21] ref(worker): Amend function signature for coroutines Coroutines have a return value, however the current function signature for the worker methods does not accomodate for this. Therefore, this signature was changed. GH-4578 --- sentry_sdk/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 200a9ea914..7325455f8f 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -34,7 +34,7 @@ def kill(self) -> None: pass def flush( - self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None ) -> None: """ Flush the worker. @@ -50,7 +50,7 @@ def full(self) -> bool: pass @abstractmethod - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: """ Schedule a callback to be executed by the worker. @@ -149,7 +149,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) From ef780f341f994f3230e95c488f53945296e5adb8 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 15:20:55 +0200 Subject: [PATCH 06/21] ref(worker): Add missing docstrings to worker ABC GH-4578 --- sentry_sdk/worker.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 7325455f8f..555539dc3a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -27,10 +27,21 @@ class Worker(ABC): @property @abstractmethod def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ pass @abstractmethod def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ pass def flush( @@ -47,6 +58,11 @@ def flush( @abstractmethod def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ pass @abstractmethod From f63e46fabdeffa4a59dc04e71f068f09be79ebf3 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:41:16 +0200 Subject: [PATCH 07/21] feat(transport): Add an async task-based worker for transport Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581 --- sentry_sdk/worker.py | 92 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 555539dc3a..d74e1ca2ce 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod import os import threading +import asyncio from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -186,3 +187,94 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: asyncio.Queue = asyncio.Queue(queue_size) + self._task: Optional[asyncio.Task] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + self._task.cancel() + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running(): + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + 1 + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + 1 + logger.error("flush timed out, dropped %s events", pending) + + async def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + logger.debug("background worker got flush request") + if self.is_alive and timeout > 0.0: + await self._wait_flush(timeout, callback) + logger.debug("background worker flushed") + + def submit(self, callback: Callable[[], None]) -> bool: + self._ensure_task() + + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + while True: + callback = await self._queue.get() + try: + callback() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + self._queue.task_done() + # Yield to let the event loop run other tasks + await asyncio.sleep(0) From 18042718e28908218f66a98ac1f1d4d8a0e877e9 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:58:20 +0200 Subject: [PATCH 08/21] ref(worker): Make worker work with new ABC interface Refactor the flush method in the async worker to use the async_flush coroutine. GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d74e1ca2ce..c3e596185e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -252,7 +252,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - async def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: logger.debug("background worker got flush request") if self.is_alive and timeout > 0.0: await self._wait_flush(timeout, callback) From 11da869c06d3774026ec3f9d1010a091cbec05b3 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:00:33 +0200 Subject: [PATCH 09/21] fix(worker): Check if callbacks from worker queue are coroutines or functions Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited. GH-4581 --- sentry_sdk/worker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c3e596185e..5dce91953e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,6 +3,7 @@ import os import threading import asyncio +import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -271,7 +272,12 @@ async def _target(self) -> None: while True: callback = await self._queue.get() try: - callback() + if inspect.iscoroutinefunction(callback): + # Callback is an async coroutine, need to await it + await callback() + else: + # Callback is a sync function, need to call it + callback() except Exception: logger.error("Failed processing job", exc_info=True) finally: From 779a0d6ac83f7192b9fbf31339c716ddab675a30 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:21:58 +0200 Subject: [PATCH 10/21] ref(worker): Amend return type of submit and flush to accomodate for coroutines Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously, only callbacks with return Type None were accepted. GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 5dce91953e..aa9c5bf1c1 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -259,7 +259,7 @@ async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> N await self._wait_flush(timeout, callback) logger.debug("background worker flushed") - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() try: From 0895d234cf93c404d850104e9a492a69be76e61f Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 15:53:20 +0200 Subject: [PATCH 11/21] ref(worker): Add type parameters for AsyncWorker variables GH-4581 --- sentry_sdk/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index aa9c5bf1c1..90813e8544 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -192,8 +192,8 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue = asyncio.Queue(queue_size) - self._task: Optional[asyncio.Task] = None + self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size) + self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None From bbf426bee0662be8f02ecc590808a8380ad4b7f5 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 16:05:04 +0200 Subject: [PATCH 12/21] ref(worker): Remove loop upon killing worker GH-4581 --- sentry_sdk/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 90813e8544..f4ae864d4d 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -211,6 +211,7 @@ def kill(self) -> None: self._task.cancel() self._task = None self._task_for_pid = None + self._loop = None def start(self) -> None: if not self.is_alive: From 744dc8acbe9a71e26953259802676672c11c569e Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 18 Jul 2025 11:59:30 +0200 Subject: [PATCH 13/21] feat(worker): Enable concurrent callbacks on async task worker Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic. GH-4581 --- sentry_sdk/worker.py | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index f4ae864d4d..c12e73c583 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -197,6 +197,8 @@ def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task] = set() @property def is_alive(self) -> bool: @@ -211,6 +213,12 @@ def kill(self) -> None: self._task.cancel() self._task = None self._task_for_pid = None + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() self._loop = None def start(self) -> None: @@ -272,16 +280,30 @@ def submit(self, callback: Callable[[], Any]) -> bool: async def _target(self) -> None: while True: callback = await self._queue.get() - try: - if inspect.iscoroutinefunction(callback): - # Callback is an async coroutine, need to await it - await callback() - else: - # Callback is a sync function, need to call it - callback() - except Exception: - logger.error("Failed processing job", exc_info=True) - finally: - self._queue.task_done() + # Firing tasks instead of awaiting them allows for concurrent requests + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) # Yield to let the event loop run other tasks await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + if inspect.iscoroutinefunction(callback): + # Callback is an async coroutine, need to await it + await callback() + else: + # Callback is a sync function, need to call it + callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + self._queue.task_done() + self._active_tasks.discard(task) From fcc8040c31675cae02de137958da95634dedac4b Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 18 Jul 2025 12:21:22 +0200 Subject: [PATCH 14/21] fix(worker): Modify kill behaviour to mirror threaded worker Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async. GH-4581 --- sentry_sdk/worker.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c12e73c583..91673d7859 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -192,7 +192,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size) + self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size) self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None @@ -210,9 +210,10 @@ def is_alive(self) -> bool: def kill(self) -> None: if self._task: - self._task.cancel() - self._task = None - self._task_for_pid = None + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") # Also cancel any active callback tasks # Avoid modifying the set while cancelling tasks tasks_to_cancel = set(self._active_tasks) @@ -220,6 +221,8 @@ def kill(self) -> None: task.cancel() self._active_tasks.clear() self._loop = None + self._task = None + self._task_for_pid = None def start(self) -> None: if not self.is_alive: @@ -280,6 +283,9 @@ def submit(self, callback: Callable[[], Any]) -> bool: async def _target(self) -> None: while True: callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break # Firing tasks instead of awaiting them allows for concurrent requests task = asyncio.create_task(self._process_callback(callback)) # Create a strong reference to the task so it can be cancelled on kill From 9a43d9b7a07b4a5785d79c7d7c5d6a2ad3c5559d Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 21 Jul 2025 10:17:32 +0200 Subject: [PATCH 15/21] ref(worker): add proper type annotation to worker task list Add proper type annotation to worker task list to fix linting problems GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 91673d7859..3491498b56 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -198,7 +198,7 @@ def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None # Track active callback tasks so they have a strong reference and can be cancelled on kill - self._active_tasks: set[asyncio.Task] = set() + self._active_tasks: set[asyncio.Task[None]] = set() @property def is_alive(self) -> bool: From b5eda0e78ed8599c126c7e19def0749cd2fe0da5 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:07:24 +0200 Subject: [PATCH 16/21] ref(worker): Refactor implementation to incorporate feedback Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation. GH-4581 --- sentry_sdk/worker.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 3491498b56..8f4625511a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,7 +3,6 @@ import os import threading import asyncio -import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -192,7 +191,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size) + self._queue: asyncio.Queue[Any] = None self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None @@ -228,10 +227,13 @@ def start(self) -> None: if not self.is_alive: try: self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) self._task = self._loop.create_task(self._target()) self._task_for_pid = os.getpid() except RuntimeError: # There is no event loop running + logger.warning("No event loop running, async worker not started") self._loop = None self._task = None self._task_for_pid = None @@ -253,7 +255,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N try: await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) except asyncio.TimeoutError: - pending = self._queue.qsize() + 1 + pending = self._queue.qsize() + len(self._active_tasks) logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) @@ -262,7 +264,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N remaining_timeout = timeout - initial_timeout await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) except asyncio.TimeoutError: - pending = self._queue.qsize() + 1 + pending = self._queue.qsize() + len(self._active_tasks) logger.error("flush timed out, dropped %s events", pending) async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: @@ -296,12 +298,8 @@ async def _target(self) -> None: await asyncio.sleep(0) async def _process_callback(self, callback: Callable[[], Any]) -> None: - if inspect.iscoroutinefunction(callback): - # Callback is an async coroutine, need to await it - await callback() - else: - # Callback is a sync function, need to call it - callback() + # Callback is an async coroutine, need to await it + await callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: From 9e380b89f860d09d44ba7e087a8a3809804b2745 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:22:23 +0200 Subject: [PATCH 17/21] ref(worker): fix queue initialization GH-4581 --- sentry_sdk/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 8f4625511a..e9ae58063d 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -191,7 +191,8 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = None + self._queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=queue_size) + self._queue_size = queue_size self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None From ee446215262b17987d418d0b480ebab863a559cd Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:50:29 +0200 Subject: [PATCH 18/21] ref(worker): Add queue as optional to allow for initialisation in start GH-4581 --- sentry_sdk/worker.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index e9ae58063d..5d620c4b83 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -191,7 +191,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=queue_size) + self._queue: Optional[asyncio.Queue[Any]] = None self._queue_size = queue_size self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process @@ -210,10 +210,11 @@ def is_alive(self) -> bool: def kill(self) -> None: if self._task: - try: - self._queue.put_nowait(_TERMINATOR) - except asyncio.QueueFull: - logger.debug("async worker queue full, kill failed") + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") # Also cancel any active callback tasks # Avoid modifying the set while cancelling tasks tasks_to_cancel = set(self._active_tasks) @@ -240,6 +241,8 @@ def start(self) -> None: self._task_for_pid = None def full(self) -> bool: + if self._queue is None: + return True return self._queue.full() def _ensure_task(self) -> None: @@ -247,7 +250,7 @@ def _ensure_task(self) -> None: self.start() async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: - if not self._loop or not self._loop.is_running(): + if not self._loop or not self._loop.is_running() or self._queue is None: return initial_timeout = min(0.1, timeout) @@ -276,7 +279,8 @@ async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> N def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() - + if self._queue is None: + return False try: self._queue.put_nowait(callback) return True @@ -284,6 +288,8 @@ def submit(self, callback: Callable[[], Any]) -> bool: return False async def _target(self) -> None: + if self._queue is None: + return while True: callback = await self._queue.get() if callback is _TERMINATOR: @@ -310,5 +316,6 @@ def _on_task_complete(self, task: asyncio.Task[None]) -> None: finally: # Mark the task as done and remove it from the active tasks set # This happens only after the task has completed - self._queue.task_done() + if self._queue is not None: + self._queue.task_done() self._active_tasks.discard(task) From d9f7383a7d83517393514bf82b450d1df57f78f9 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:01:40 +0200 Subject: [PATCH 19/21] ref(worker): Change to sync flush method that launches task GH-4581 --- sentry_sdk/worker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 5d620c4b83..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -271,11 +271,10 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N pending = self._queue.qsize() + len(self._active_tasks) logger.error("flush timed out, dropped %s events", pending) - async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: - logger.debug("background worker got flush request") - if self.is_alive and timeout > 0.0: - await self._wait_flush(timeout, callback) - logger.debug("background worker flushed") + def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() From d2e647b6553558524f777e9d4d6806e565f09762 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:56:37 +0200 Subject: [PATCH 20/21] ref(worker): Readd coroutine check for worker callbacks The flush method in the transport enqueues a sync callback for the worker, therefore the check needs to be here after all. GH-4581 --- sentry_sdk/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c8dbbb2d73..ebf86f412e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,6 +3,7 @@ import os import threading import asyncio +import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -305,7 +306,11 @@ async def _target(self) -> None: async def _process_callback(self, callback: Callable[[], Any]) -> None: # Callback is an async coroutine, need to await it - await callback() + if inspect.iscoroutinefunction(callback): + await callback() + else: + # Callback is a sync function, such as _flush_client_reports() + callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: From 859a0e2914c4b8d9a820b7e648dd777300d64e3a Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 31 Jul 2025 14:06:54 +0200 Subject: [PATCH 21/21] ref(worker): Remove sync callbacks from worker processing for now The callbacks passed to the worker from the transport are all async now, so this is currently not needed. GH-4581 --- sentry_sdk/worker.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index ebf86f412e..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,7 +3,6 @@ import os import threading import asyncio -import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -306,11 +305,7 @@ async def _target(self) -> None: async def _process_callback(self, callback: Callable[[], Any]) -> None: # Callback is an async coroutine, need to await it - if inspect.iscoroutinefunction(callback): - await callback() - else: - # Callback is a sync function, such as _flush_client_reports() - callback() + await callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: