Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 69 additions & 18 deletions src/conductor/providers/copilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@

logger = logging.getLogger(__name__)

# Events that should NOT reset the idle-detection clock. These are internal
# bookkeeping / lifecycle events that can fire continuously (e.g. during stuck
# MCP initialization) without reflecting real agent progress.
# - pending_messages.modified: fires repeatedly while MCP messages are queued
# - session.start: one-time lifecycle event at session setup
# - session.info: one-time informational metadata at session setup
_IDLE_IGNORED_EVENTS: frozenset[str] = frozenset(
{
"pending_messages.modified",
"session.start",
"session.info",
}
)

# Try to import the Copilot SDK
try:
from copilot import CopilotClient
Expand Down Expand Up @@ -66,12 +80,15 @@ class IdleRecoveryConfig:
Attributes:
idle_timeout_seconds: Time without any SDK events before considering session idle.
max_recovery_attempts: Maximum number of "continue" messages to send before failing.
max_session_seconds: Hard wall-clock limit on total session duration. Prevents
sessions from hanging indefinitely even if non-idle events keep flowing.
recovery_prompt: Template for the recovery message sent to stuck sessions.
Use {last_activity} placeholder for context about what was happening.
"""

idle_timeout_seconds: float = 300.0 # 5 minutes
max_recovery_attempts: int = 3
max_session_seconds: float = 1800.0 # 30 minutes
recovery_prompt: str = (
"It appears you may have gotten stuck or stopped responding. "
"Your last activity was: {last_activity}. "
Expand Down Expand Up @@ -156,6 +173,7 @@ def __init__(
self._default_model = model or "gpt-4o"
self._mcp_servers = mcp_servers or {}
self._started = False
self._start_lock = asyncio.Lock()
self._idle_recovery_config = idle_recovery_config or IdleRecoveryConfig()
self._temperature = temperature
self._session_ids: dict[str, str] = {}
Expand Down Expand Up @@ -393,9 +411,8 @@ async def _execute_sdk_call(
is_retryable=False,
)

# Ensure client is started
if not self._started:
await self._ensure_client_started()
# Ensure client is started; lock serializes concurrent first calls
await self._ensure_client_started()

model = agent.model or self._default_model

Expand Down Expand Up @@ -651,9 +668,12 @@ def on_event(event: Any) -> None:
tool_info = f" tool={tn}"
logger.debug("sdk_event: %s%s", event_type, tool_info)

# Update last activity on EVERY event (this is key for idle detection!)
last_activity_ref[0] = event_type
last_activity_ref[2] = time.monotonic()
# Only update the idle clock for events that indicate real agent
# work. Bookkeeping/lifecycle events are excluded via the
# module-level _IDLE_IGNORED_EVENTS constant.
if event_type not in _IDLE_IGNORED_EVENTS:
last_activity_ref[0] = event_type
last_activity_ref[2] = time.monotonic()

if event_type == "assistant.message":
response_content = event.data.content
Expand Down Expand Up @@ -1291,17 +1311,42 @@ async def _wait_with_idle_detection(
for tracking last activity.

Raises:
ProviderError: If all recovery attempts are exhausted.
ProviderError: If all recovery attempts are exhausted, or if the
session exceeds max_session_seconds wall-clock duration.
"""
recovery_attempts = 0
idle_timeout = self._idle_recovery_config.idle_timeout_seconds
session_start = time.monotonic()
max_session = self._idle_recovery_config.max_session_seconds

while True:
# Check if done was already set (avoids race where session.idle
# arrived between a previous done.clear() and the next wait).
if done.is_set():
return

# Hard wall-clock limit — prevents sessions from hanging
# indefinitely even if events keep flowing (e.g. repeated
# pending_messages.modified during stuck MCP initialization).
# Note: this check runs at idle_timeout_seconds granularity, so
# actual max duration is approximately max_session + idle_timeout.
elapsed = time.monotonic() - session_start
if elapsed > max_session:
last_event_type = last_activity_ref[0]
last_tool_call = last_activity_ref[1]
time_since_last = time.monotonic() - last_activity_ref[2]
stuck_info = self._build_stuck_info(last_event_type, last_tool_call)
raise ProviderError(
f"Session exceeded maximum duration of {max_session:.0f}s. "
f"{stuck_info} Last real event {time_since_last:.0f}s ago.",
suggestion=(
f"The session ran for {elapsed:.0f}s without completing. "
"This may indicate a stuck MCP server, infinite tool loop, "
"or provider issue. Enable --log-file to capture full debug output."
),
is_retryable=False, # Don't retry — same root cause will recur
)

try:
# Wait for done with idle timeout
await asyncio.wait_for(
Expand Down Expand Up @@ -1366,17 +1411,23 @@ async def _wait_with_idle_detection(
done.clear()

async def _ensure_client_started(self) -> None:
"""Ensure the Copilot client is started."""
if self._client is None:
self._client = CopilotClient()
if not self._started:
await self._client.start()
self._started = True

# Ensure subprocess pipes are in blocking mode to prevent
# BlockingIOError on large payloads. The asyncio event loop
# may set O_NONBLOCK on inherited file descriptors.
self._fix_pipe_blocking_mode()
"""Ensure the Copilot client is started.

Uses a lock to prevent concurrent agents (parallel groups or
for-each iterations) from racing to start the same client
subprocess multiple times.
"""
async with self._start_lock:
if self._client is None:
self._client = CopilotClient()
if not self._started:
await self._client.start()
self._started = True

# Ensure subprocess pipes are in blocking mode to prevent
# BlockingIOError on large payloads. The asyncio event loop
# may set O_NONBLOCK on inherited file descriptors.
self._fix_pipe_blocking_mode()

def _fix_pipe_blocking_mode(self) -> None:
"""Clear O_NONBLOCK on the Copilot CLI subprocess pipes.
Expand Down
Loading
Loading