Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/conductor/web/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,14 @@ async def wait_for_gate_response(self, agent_name: str) -> dict[str, Any]:
"""Wait for a gate response from a web client.

Blocks until a ``gate_response`` message is received via WebSocket
that matches the given agent name. Non-matching messages are
re-queued so they are not lost.
that matches the given agent name.

Non-matching messages are discarded with a warning. Because
conductor only presents one gate at a time, any ``gate_response``
addressed to a different agent is stale (e.g. a duplicate click
from a dashboard that missed the first resolution) and cannot be
delivered — re-queueing would only cause it to be re-examined on
every subsequent gate with no chance of ever matching.

Args:
agent_name: The name of the human_gate agent to wait for.
Expand All @@ -300,10 +306,11 @@ async def wait_for_gate_response(self, agent_name: str) -> dict[str, Any]:
msg = await self._gate_response_queue.get()
if msg.get("agent_name") == agent_name:
return msg
# Not for this agent — put it back
self._gate_response_queue.put_nowait(msg)
# Yield to avoid busy-loop
await asyncio.sleep(0.01)
logger.warning(
"Discarding stale gate_response for agent %r while waiting on %r",
msg.get("agent_name"),
agent_name,
)

# ------------------------------------------------------------------
# Auto-shutdown (--web-bg)
Expand Down
40 changes: 40 additions & 0 deletions tests/test_web/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,46 @@ async def _cancel_serve(self: object) -> None:
await dashboard.start()


class TestWaitForGateResponse:
"""Tests for WebDashboard.wait_for_gate_response stale-message handling."""

@pytest.mark.asyncio
async def test_returns_matching_response(self) -> None:
"""Returns the message whose agent_name matches the awaited agent."""
_, dashboard = _make_dashboard()
await dashboard._gate_response_queue.put(
{"agent_name": "plan_approval", "selected_value": "approved"}
)

msg = await asyncio.wait_for(dashboard.wait_for_gate_response("plan_approval"), timeout=1.0)

assert msg["selected_value"] == "approved"

@pytest.mark.asyncio
async def test_discards_stale_non_matching_messages(self) -> None:
"""Non-matching gate_response messages are discarded, not re-queued.

Regression test for the busy-loop bug where stale messages (e.g. a
duplicate click for a previously-resolved gate) were re-queued with
a 10ms sleep, spinning at ~100Hz forever because ``asyncio.Queue``
has no dedup.
"""
_, dashboard = _make_dashboard()
# Enqueue a stale message followed by the matching one.
await dashboard._gate_response_queue.put(
{"agent_name": "old_gate", "selected_value": "approved"}
)
await dashboard._gate_response_queue.put(
{"agent_name": "current_gate", "selected_value": "rejected"}
)

msg = await asyncio.wait_for(dashboard.wait_for_gate_response("current_gate"), timeout=1.0)

assert msg["agent_name"] == "current_gate"
assert msg["selected_value"] == "rejected"
assert dashboard._gate_response_queue.empty()


async def _short_grace(event: asyncio.Event, delay: float) -> None:
"""Helper for testing: short grace period."""
await asyncio.sleep(delay)
Expand Down
Loading