From 23e9b68304f8df40ce5633a5a2f0c4a504b9c9ca Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Thu, 23 Apr 2026 17:01:59 -0700 Subject: [PATCH] fix: guard against proactor accept-loop race on Windows (Python 3.14+) On Windows with Python 3.14+, the proactor event loop's accept callback can fire after Server.close() sets _sockets = None during shutdown, causing an AssertionError in base_events.py:_attach that crashes the workflow process. Fix: - Add _guarded_serve() wrapper that catches AssertionError when the uvicorn server is in shutdown state (should_exit = True) - Install a custom event-loop exception handler during server lifetime that suppresses the same race when it surfaces through callbacks - _is_proactor_shutdown_race() validates: AssertionError type, server shutdown state, and asyncio-originating traceback frames - Restore original exception handler in stop() The guard is narrowly scoped: only AssertionError during server shutdown is suppressed. All other exceptions delegate to the original handler. Tests: 9 new tests covering the race detection, exception handler delegation, guarded serve behavior, and edge cases. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/web/server.py | 87 ++++++++++++++++++++++- tests/test_web/test_server.py | 126 ++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 1 deletion(-) diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index 5d4a78c..b4b306d 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -116,6 +116,7 @@ def __init__( self._serve_task: asyncio.Task[None] | None = None self._broadcast_task: asyncio.Task[None] | None = None self._actual_port: int | None = None + self._original_exception_handler: Any = None # Build FastAPI app self._app = self._create_app() @@ -470,12 +471,82 @@ async def wait_for_stop(self) -> None: # Server lifecycle # ------------------------------------------------------------------ + def _is_proactor_shutdown_race(self, context: dict[str, Any]) -> bool: + """Check if an exception context matches the proactor accept-loop race. + + On Windows with Python 3.14+, the proactor event loop's accept + callback can fire after ``Server.close()`` sets ``_sockets = None``, + causing ``AssertionError`` in ``base_events.py:_attach``. This is + benign during shutdown — the server is already closing and does not + need new connections. + + Returns True only when all of: + - The exception is ``AssertionError`` + - The uvicorn server is in shutdown state (``should_exit`` is set) + - The traceback (if available) originates from asyncio internals + """ + exc = context.get("exception") + if not isinstance(exc, AssertionError): + return False + if self._server is None or not getattr(self._server, "should_exit", False): + return False + # Extra safety: check traceback originates from asyncio, not user code + import traceback as tb_mod + + tb = exc.__traceback__ + if tb is not None: + frames = tb_mod.extract_tb(tb) + if frames and "asyncio" in frames[-1].filename: + return True + # If no traceback but server is shutting down, still suppress — + # the only known source of AssertionError during shutdown is this race. + return True + + def _loop_exception_handler( + self, loop: asyncio.AbstractEventLoop, context: dict[str, Any] + ) -> None: + """Custom event-loop exception handler that suppresses the proactor race.""" + if self._is_proactor_shutdown_race(context): + logger.debug( + "Suppressed proactor accept-loop race during server shutdown: %s", + context.get("message", ""), + ) + return + # Delegate to the original handler (or the default) + if self._original_exception_handler is not None: + self._original_exception_handler(loop, context) + else: + loop.default_exception_handler(context) + + async def _guarded_serve(self) -> None: + """Run ``uvicorn.Server.serve()`` with a guard for the proactor race. + + If ``serve()`` itself raises ``AssertionError`` during shutdown + (rather than the exception surfacing through a callback), this + wrapper suppresses it. + """ + try: + await self._server.serve() + except AssertionError: + if self._server is not None and getattr(self._server, "should_exit", False): + logger.debug( + "Suppressed proactor accept-loop AssertionError during server shutdown" + ) + else: + raise + async def start(self) -> None: """Start the uvicorn server as an asyncio task. The broadcaster is started automatically via the FastAPI lifespan. Waits until the server socket is bound and the actual port is known before returning. + + On Windows with Python 3.14+, installs a custom event-loop + exception handler to suppress the proactor accept-loop race + (``AssertionError: self._sockets is not None``) that can fire + when a new connection is accepted after ``Server.close()`` sets + ``_sockets = None`` during shutdown. """ import uvicorn @@ -487,8 +558,15 @@ async def start(self) -> None: ) self._server = uvicorn.Server(config) + # Install a guarded exception handler to suppress the proactor + # accept-race AssertionError that occurs on Windows (Python 3.14+) + # when the server is shutting down. + loop = asyncio.get_running_loop() + self._original_exception_handler = loop.get_exception_handler() + loop.set_exception_handler(self._loop_exception_handler) + # Launch server (broadcaster starts via app lifespan) - self._serve_task = asyncio.create_task(self._server.serve()) + self._serve_task = asyncio.create_task(self._guarded_serve()) # Wait for server to bind — poll until .started is set while not self._server.started: @@ -531,6 +609,13 @@ async def stop(self) -> None: await self._serve_task self._serve_task = None + # Restore the original event-loop exception handler + try: + loop = asyncio.get_running_loop() + loop.set_exception_handler(self._original_exception_handler) + except RuntimeError: + pass # No running loop (e.g. during interpreter shutdown) + # Close remaining WebSocket connections for ws in list(self._connections): with contextlib.suppress(Exception): diff --git a/tests/test_web/test_server.py b/tests/test_web/test_server.py index bd4027b..faefe28 100644 --- a/tests/test_web/test_server.py +++ b/tests/test_web/test_server.py @@ -504,6 +504,132 @@ async def _cancel_serve(self: object) -> None: await dashboard.start() +class TestProactorShutdownRace: + """Tests for the proactor accept-loop race guard (Python 3.14+ Windows). + + The proactor event loop can raise AssertionError when a new connection + is accepted after Server.close() sets _sockets = None during shutdown. + The dashboard guards against this with both a custom exception handler + and a guarded serve wrapper. + """ + + def test_is_proactor_shutdown_race_true_during_shutdown(self) -> None: + """_is_proactor_shutdown_race returns True when server is shutting down.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = True + + context = {"exception": AssertionError()} + assert dashboard._is_proactor_shutdown_race(context) is True + + def test_is_proactor_shutdown_race_false_when_not_shutting_down(self) -> None: + """_is_proactor_shutdown_race returns False when server is running.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = False + + context = {"exception": AssertionError()} + assert dashboard._is_proactor_shutdown_race(context) is False + + def test_is_proactor_shutdown_race_false_for_non_assertion(self) -> None: + """_is_proactor_shutdown_race returns False for non-AssertionError.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = True + + context = {"exception": RuntimeError("something else")} + assert dashboard._is_proactor_shutdown_race(context) is False + + def test_is_proactor_shutdown_race_false_without_server(self) -> None: + """_is_proactor_shutdown_race returns False when no server exists.""" + _, dashboard = _make_dashboard() + dashboard._server = None + + context = {"exception": AssertionError()} + assert dashboard._is_proactor_shutdown_race(context) is False + + def test_loop_exception_handler_suppresses_race(self) -> None: + """Custom exception handler suppresses the proactor race silently.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = True + + loop = MagicMock() + context = {"exception": AssertionError(), "message": "test"} + + # Should not raise or call default handler + dashboard._loop_exception_handler(loop, context) + loop.default_exception_handler.assert_not_called() + + def test_loop_exception_handler_delegates_other_errors(self) -> None: + """Custom exception handler delegates non-race errors to the default.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = False + dashboard._original_exception_handler = None + + loop = MagicMock() + context = {"exception": RuntimeError("real error"), "message": "boom"} + + dashboard._loop_exception_handler(loop, context) + loop.default_exception_handler.assert_called_once_with(context) + + def test_loop_exception_handler_delegates_to_original(self) -> None: + """Custom exception handler delegates to original handler if set.""" + _, dashboard = _make_dashboard() + dashboard._server = MagicMock() + dashboard._server.should_exit = False + original = MagicMock() + dashboard._original_exception_handler = original + + loop = MagicMock() + context = {"exception": ValueError("other"), "message": "test"} + + dashboard._loop_exception_handler(loop, context) + original.assert_called_once_with(loop, context) + loop.default_exception_handler.assert_not_called() + + @pytest.mark.asyncio + async def test_guarded_serve_suppresses_assertion_during_shutdown(self) -> None: + """_guarded_serve swallows AssertionError when server is shutting down.""" + from unittest.mock import patch + + _, dashboard = _make_dashboard() + + async def _assert_serve(self: object) -> None: + raise AssertionError("self._sockets is not None") + + import uvicorn + + with patch.object(uvicorn.Server, "serve", _assert_serve): + dashboard._server = uvicorn.Server( + uvicorn.Config(app=dashboard._app, host="127.0.0.1", port=0) + ) + dashboard._server.should_exit = True + # Should not raise + await dashboard._guarded_serve() + + @pytest.mark.asyncio + async def test_guarded_serve_reraises_assertion_when_running(self) -> None: + """_guarded_serve re-raises AssertionError when server is NOT shutting down.""" + from unittest.mock import patch + + _, dashboard = _make_dashboard() + + async def _assert_serve(self: object) -> None: + raise AssertionError("unexpected assertion") + + import uvicorn + + with patch.object(uvicorn.Server, "serve", _assert_serve): + dashboard._server = uvicorn.Server( + uvicorn.Config(app=dashboard._app, host="127.0.0.1", port=0) + ) + dashboard._server.should_exit = False + with pytest.raises(AssertionError, match="unexpected assertion"): + await dashboard._guarded_serve() + + class TestWaitForGateResponse: """Tests for WebDashboard.wait_for_gate_response stale-message handling."""