Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion src/conductor/web/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __init__(
self._serve_task: asyncio.Task[None] | None = None
self._broadcast_task: asyncio.Task[None] | None = None
self._actual_port: int | None = None
self._original_exception_handler: Any = None

# Build FastAPI app
self._app = self._create_app()
Expand Down Expand Up @@ -470,12 +471,82 @@ async def wait_for_stop(self) -> None:
# Server lifecycle
# ------------------------------------------------------------------

def _is_proactor_shutdown_race(self, context: dict[str, Any]) -> bool:
"""Check if an exception context matches the proactor accept-loop race.

On Windows with Python 3.14+, the proactor event loop's accept
callback can fire after ``Server.close()`` sets ``_sockets = None``,
causing ``AssertionError`` in ``base_events.py:_attach``. This is
benign during shutdown — the server is already closing and does not
need new connections.

Returns True only when all of:
- The exception is ``AssertionError``
- The uvicorn server is in shutdown state (``should_exit`` is set)
- The traceback (if available) originates from asyncio internals
"""
exc = context.get("exception")
if not isinstance(exc, AssertionError):
return False
if self._server is None or not getattr(self._server, "should_exit", False):
return False
# Extra safety: check traceback originates from asyncio, not user code
import traceback as tb_mod

tb = exc.__traceback__
if tb is not None:
frames = tb_mod.extract_tb(tb)
if frames and "asyncio" in frames[-1].filename:
return True
# If no traceback but server is shutting down, still suppress —
# the only known source of AssertionError during shutdown is this race.
return True

def _loop_exception_handler(
self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
) -> None:
"""Custom event-loop exception handler that suppresses the proactor race."""
if self._is_proactor_shutdown_race(context):
logger.debug(
"Suppressed proactor accept-loop race during server shutdown: %s",
context.get("message", ""),
)
return
# Delegate to the original handler (or the default)
if self._original_exception_handler is not None:
self._original_exception_handler(loop, context)
else:
loop.default_exception_handler(context)

async def _guarded_serve(self) -> None:
"""Run ``uvicorn.Server.serve()`` with a guard for the proactor race.

If ``serve()`` itself raises ``AssertionError`` during shutdown
(rather than the exception surfacing through a callback), this
wrapper suppresses it.
"""
try:
await self._server.serve()
except AssertionError:
if self._server is not None and getattr(self._server, "should_exit", False):
logger.debug(
"Suppressed proactor accept-loop AssertionError during server shutdown"
)
else:
raise

async def start(self) -> None:
"""Start the uvicorn server as an asyncio task.

The broadcaster is started automatically via the FastAPI lifespan.
Waits until the server socket is bound and the actual port is
known before returning.

On Windows with Python 3.14+, installs a custom event-loop
exception handler to suppress the proactor accept-loop race
(``AssertionError: self._sockets is not None``) that can fire
when a new connection is accepted after ``Server.close()`` sets
``_sockets = None`` during shutdown.
"""
import uvicorn

Expand All @@ -487,8 +558,15 @@ async def start(self) -> None:
)
self._server = uvicorn.Server(config)

# Install a guarded exception handler to suppress the proactor
# accept-race AssertionError that occurs on Windows (Python 3.14+)
# when the server is shutting down.
loop = asyncio.get_running_loop()
self._original_exception_handler = loop.get_exception_handler()
loop.set_exception_handler(self._loop_exception_handler)

# Launch server (broadcaster starts via app lifespan)
self._serve_task = asyncio.create_task(self._server.serve())
self._serve_task = asyncio.create_task(self._guarded_serve())

# Wait for server to bind — poll until .started is set
while not self._server.started:
Expand Down Expand Up @@ -531,6 +609,13 @@ async def stop(self) -> None:
await self._serve_task
self._serve_task = None

# Restore the original event-loop exception handler
try:
loop = asyncio.get_running_loop()
loop.set_exception_handler(self._original_exception_handler)
except RuntimeError:
pass # No running loop (e.g. during interpreter shutdown)

# Close remaining WebSocket connections
for ws in list(self._connections):
with contextlib.suppress(Exception):
Expand Down
126 changes: 126 additions & 0 deletions tests/test_web/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,132 @@ async def _cancel_serve(self: object) -> None:
await dashboard.start()


class TestProactorShutdownRace:
"""Tests for the proactor accept-loop race guard (Python 3.14+ Windows).

The proactor event loop can raise AssertionError when a new connection
is accepted after Server.close() sets _sockets = None during shutdown.
The dashboard guards against this with both a custom exception handler
and a guarded serve wrapper.
"""

def test_is_proactor_shutdown_race_true_during_shutdown(self) -> None:
"""_is_proactor_shutdown_race returns True when server is shutting down."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = True

context = {"exception": AssertionError()}
assert dashboard._is_proactor_shutdown_race(context) is True

def test_is_proactor_shutdown_race_false_when_not_shutting_down(self) -> None:
"""_is_proactor_shutdown_race returns False when server is running."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = False

context = {"exception": AssertionError()}
assert dashboard._is_proactor_shutdown_race(context) is False

def test_is_proactor_shutdown_race_false_for_non_assertion(self) -> None:
"""_is_proactor_shutdown_race returns False for non-AssertionError."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = True

context = {"exception": RuntimeError("something else")}
assert dashboard._is_proactor_shutdown_race(context) is False

def test_is_proactor_shutdown_race_false_without_server(self) -> None:
"""_is_proactor_shutdown_race returns False when no server exists."""
_, dashboard = _make_dashboard()
dashboard._server = None

context = {"exception": AssertionError()}
assert dashboard._is_proactor_shutdown_race(context) is False

def test_loop_exception_handler_suppresses_race(self) -> None:
"""Custom exception handler suppresses the proactor race silently."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = True

loop = MagicMock()
context = {"exception": AssertionError(), "message": "test"}

# Should not raise or call default handler
dashboard._loop_exception_handler(loop, context)
loop.default_exception_handler.assert_not_called()

def test_loop_exception_handler_delegates_other_errors(self) -> None:
"""Custom exception handler delegates non-race errors to the default."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = False
dashboard._original_exception_handler = None

loop = MagicMock()
context = {"exception": RuntimeError("real error"), "message": "boom"}

dashboard._loop_exception_handler(loop, context)
loop.default_exception_handler.assert_called_once_with(context)

def test_loop_exception_handler_delegates_to_original(self) -> None:
"""Custom exception handler delegates to original handler if set."""
_, dashboard = _make_dashboard()
dashboard._server = MagicMock()
dashboard._server.should_exit = False
original = MagicMock()
dashboard._original_exception_handler = original

loop = MagicMock()
context = {"exception": ValueError("other"), "message": "test"}

dashboard._loop_exception_handler(loop, context)
original.assert_called_once_with(loop, context)
loop.default_exception_handler.assert_not_called()

@pytest.mark.asyncio
async def test_guarded_serve_suppresses_assertion_during_shutdown(self) -> None:
"""_guarded_serve swallows AssertionError when server is shutting down."""
from unittest.mock import patch

_, dashboard = _make_dashboard()

async def _assert_serve(self: object) -> None:
raise AssertionError("self._sockets is not None")

import uvicorn

with patch.object(uvicorn.Server, "serve", _assert_serve):
dashboard._server = uvicorn.Server(
uvicorn.Config(app=dashboard._app, host="127.0.0.1", port=0)
)
dashboard._server.should_exit = True
# Should not raise
await dashboard._guarded_serve()

@pytest.mark.asyncio
async def test_guarded_serve_reraises_assertion_when_running(self) -> None:
"""_guarded_serve re-raises AssertionError when server is NOT shutting down."""
from unittest.mock import patch

_, dashboard = _make_dashboard()

async def _assert_serve(self: object) -> None:
raise AssertionError("unexpected assertion")

import uvicorn

with patch.object(uvicorn.Server, "serve", _assert_serve):
dashboard._server = uvicorn.Server(
uvicorn.Config(app=dashboard._app, host="127.0.0.1", port=0)
)
dashboard._server.should_exit = False
with pytest.raises(AssertionError, match="unexpected assertion"):
await dashboard._guarded_serve()


class TestWaitForGateResponse:
"""Tests for WebDashboard.wait_for_gate_response stale-message handling."""

Expand Down