Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion parol6/client/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,19 @@ def __init__(
)
self._ep_lock = asyncio.Lock()

# The event loop that owns the persistent UDP transport, RX queue,
# and async locks. Assigned by ``_ensure_endpoint`` on the first
# call that creates the endpoint. Subsequent calls verify the
# current loop matches and raise a clear error otherwise instead
# of letting ``asyncio.Queue``/``Lock`` surface the cryptic
# "bound to a different event loop" RuntimeError from deep inside
# ``_request_ok_raw``. Most commonly hit when wrapping an
# AsyncRobotClient inside a sync RobotClient (which drives its
# own private thread-loop) and then ALSO calling AsyncRobotClient
# methods directly from a different loop, e.g.
# ``loop.create_task(inner.halt())`` from a NiceGUI page handler.
self._bound_loop: asyncio.AbstractEventLoop | None = None

# Serialize request/response
self._req_lock = asyncio.Lock()

Expand Down Expand Up @@ -354,17 +367,68 @@ def port(self, value: int) -> None:
# --------------- Internal helpers ---------------

async def _ensure_endpoint(self) -> None:
"""Lazily create a persistent asyncio UDP datagram endpoint."""
"""Lazily create a persistent asyncio UDP datagram endpoint.

AsyncRobotClient is single-loop-bound: once an endpoint is
created on a particular event loop, every async method on this
instance must be invoked from that same loop. The UDP transport,
RX queue, request/endpoint locks, and shared status event are
all asyncio primitives bound to the loop that constructed them,
and reusing them from a different loop raises ``RuntimeError:
<Queue ...> is bound to a different event loop`` from deep
inside ``_request_ok_raw``.

This method enforces the single-loop contract up-front with a
clear error referencing the offending loop. The most common
cause is wrapping an AsyncRobotClient inside a sync RobotClient
(which drives its own private thread-loop) and then ALSO
calling AsyncRobotClient methods directly from a different
loop, e.g. ``loop.create_task(inner.halt())`` from a NiceGUI
page handler — the sync wrapper has already bound the client
to its thread-loop, and the page-handler's create_task
schedules the coroutine on the main loop instead.

Callers that genuinely need cross-loop access should either
dispatch through ``asyncio.run_coroutine_threadsafe(coro,
bound_loop)`` (preserving the original binding) or route
every call through the sync RobotClient wrapper.
"""
if self._closed:
raise RuntimeError("AsyncRobotClient is closed")
if self._transport is not None:
# Endpoint already created — verify the calling loop matches
# the one that owns it. Falling through silently would let
# asyncio.Queue.get() raise its own less-helpful error.
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop — sync caller hit an async method
# without an event loop in scope. Let the original error
# surface from the await site (better than fabricating
# a different one here).
return
if (
self._bound_loop is not None
and self._bound_loop is not current_loop
):
raise RuntimeError(
"AsyncRobotClient endpoint is bound to event loop "
f"{self._bound_loop!r} but called from "
f"{current_loop!r}. AsyncRobotClient is single-loop"
"-bound; reusing one instance across loops (e.g. "
"via a sync RobotClient wrapper) requires "
"dispatching all calls through the original loop, "
"or wrapping cross-loop calls in "
"asyncio.run_coroutine_threadsafe(coro, bound_loop)."
)
return
async with self._ep_lock:
if self._closed:
raise RuntimeError("AsyncRobotClient is closed")
if self._transport is not None:
return
loop = asyncio.get_running_loop()
self._bound_loop = loop
self._rx_queue = asyncio.Queue(maxsize=256)
transport, protocol = await loop.create_datagram_endpoint(
lambda: _UDPClientProtocol(self._rx_queue),
Expand Down