From 73643b96a211675e78b3a0b07bf09c1f4040b7d8 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Fri, 6 Mar 2026 14:16:28 -0500 Subject: [PATCH 1/4] fix(copilot): harden idle detection, add session timeout, and fix startup race - Exclude bookkeeping events (pending_messages.modified, session.start, session.info) from resetting the idle clock so the watchdog can detect hangs during stuck MCP initialization. - Add a hard wall-clock max_session_seconds limit (default 30 min) that terminates sessions even when non-idle events keep flowing. - Protect _ensure_client_started() with an asyncio.Lock to prevent concurrent for-each items from racing to start the subprocess twice. Co-Authored-By: Claude Opus 4.6 --- src/conductor/providers/copilot.py | 65 ++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/src/conductor/providers/copilot.py b/src/conductor/providers/copilot.py index 213e7eb..a1a90bf 100644 --- a/src/conductor/providers/copilot.py +++ b/src/conductor/providers/copilot.py @@ -66,12 +66,15 @@ class IdleRecoveryConfig: Attributes: idle_timeout_seconds: Time without any SDK events before considering session idle. max_recovery_attempts: Maximum number of "continue" messages to send before failing. + max_session_seconds: Hard wall-clock limit on total session duration. Prevents + sessions from hanging indefinitely even if non-idle events keep flowing. recovery_prompt: Template for the recovery message sent to stuck sessions. Use {last_activity} placeholder for context about what was happening. """ idle_timeout_seconds: float = 300.0 # 5 minutes max_recovery_attempts: int = 3 + max_session_seconds: float = 1800.0 # 30 minutes recovery_prompt: str = ( "It appears you may have gotten stuck or stopped responding. " "Your last activity was: {last_activity}. " @@ -156,6 +159,7 @@ def __init__( self._default_model = model or "gpt-4o" self._mcp_servers = mcp_servers or {} self._started = False + self._start_lock = asyncio.Lock() self._idle_recovery_config = idle_recovery_config or IdleRecoveryConfig() self._temperature = temperature self._session_ids: dict[str, str] = {} @@ -651,9 +655,18 @@ def on_event(event: Any) -> None: tool_info = f" tool={tn}" logger.debug("sdk_event: %s%s", event_type, tool_info) - # Update last activity on EVERY event (this is key for idle detection!) - last_activity_ref[0] = event_type - last_activity_ref[2] = time.monotonic() + # Update last activity only for events that indicate real agent work. + # Internal bookkeeping events like "pending_messages.modified" can fire + # continuously during stuck MCP initialization, which would reset the + # idle clock and prevent the watchdog from ever detecting a hang. + _IDLE_IGNORED_EVENTS = { + "pending_messages.modified", + "session.start", + "session.info", + } + if event_type not in _IDLE_IGNORED_EVENTS: + last_activity_ref[0] = event_type + last_activity_ref[2] = time.monotonic() if event_type == "assistant.message": response_content = event.data.content @@ -1295,6 +1308,8 @@ async def _wait_with_idle_detection( """ recovery_attempts = 0 idle_timeout = self._idle_recovery_config.idle_timeout_seconds + session_start = time.monotonic() + max_session = self._idle_recovery_config.max_session_seconds while True: # Check if done was already set (avoids race where session.idle @@ -1302,6 +1317,25 @@ async def _wait_with_idle_detection( if done.is_set(): return + # Hard wall-clock limit — prevents sessions from hanging + # indefinitely even if events keep flowing (e.g. repeated + # pending_messages.modified during stuck MCP initialization). + elapsed = time.monotonic() - session_start + if elapsed > max_session: + last_event_type = last_activity_ref[0] + last_tool_call = last_activity_ref[1] + stuck_info = self._build_stuck_info(last_event_type, last_tool_call) + raise ProviderError( + f"Session exceeded maximum duration of {max_session:.0f}s. " + f"{stuck_info}", + suggestion=( + f"The session ran for {elapsed:.0f}s without completing. " + "This may indicate a stuck MCP server, infinite tool loop, " + "or provider issue. Enable --log-file to capture full debug output." + ), + is_retryable=True, + ) + try: # Wait for done with idle timeout await asyncio.wait_for( @@ -1366,17 +1400,22 @@ async def _wait_with_idle_detection( done.clear() async def _ensure_client_started(self) -> None: - """Ensure the Copilot client is started.""" - if self._client is None: - self._client = CopilotClient() - if not self._started: - await self._client.start() - self._started = True + """Ensure the Copilot client is started. - # Ensure subprocess pipes are in blocking mode to prevent - # BlockingIOError on large payloads. The asyncio event loop - # may set O_NONBLOCK on inherited file descriptors. - self._fix_pipe_blocking_mode() + Uses a lock to prevent concurrent for-each items from racing + to start the same client subprocess multiple times. + """ + async with self._start_lock: + if self._client is None: + self._client = CopilotClient() + if not self._started: + await self._client.start() + self._started = True + + # Ensure subprocess pipes are in blocking mode to prevent + # BlockingIOError on large payloads. The asyncio event loop + # may set O_NONBLOCK on inherited file descriptors. + self._fix_pipe_blocking_mode() def _fix_pipe_blocking_mode(self) -> None: """Clear O_NONBLOCK on the Copilot CLI subprocess pipes. From dd6f865665f4566863670b0ed16c9d34a83442d0 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Fri, 6 Mar 2026 14:18:55 -0500 Subject: [PATCH 2/4] style(providers): fix ruff formatting in copilot provider --- src/conductor/providers/copilot.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/conductor/providers/copilot.py b/src/conductor/providers/copilot.py index a1a90bf..0a99c34 100644 --- a/src/conductor/providers/copilot.py +++ b/src/conductor/providers/copilot.py @@ -1326,8 +1326,7 @@ async def _wait_with_idle_detection( last_tool_call = last_activity_ref[1] stuck_info = self._build_stuck_info(last_event_type, last_tool_call) raise ProviderError( - f"Session exceeded maximum duration of {max_session:.0f}s. " - f"{stuck_info}", + f"Session exceeded maximum duration of {max_session:.0f}s. {stuck_info}", suggestion=( f"The session ran for {elapsed:.0f}s without completing. " "This may indicate a stuck MCP server, infinite tool loop, " From 195eb44e3e586bf4320ac2273ceaca5e5fc7066e Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Fri, 6 Mar 2026 14:31:20 -0500 Subject: [PATCH 3/4] fix(copilot): address review feedback on idle detection and startup race PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _IDLE_IGNORED_EVENTS to module-level frozenset to avoid per-event set allocation inside on_event() callback - Change is_retryable to False on wall-clock timeout ProviderError to prevent retrying sessions that will hit the same root cause - Remove redundant pre-lock `if not self._started` guard since the internal lock already handles the fast path - Add time-since-last-event to session timeout error for diagnostics - Document wall-clock check granularity (max ≈ max_session + idle_timeout) - Broaden _ensure_client_started docstring to cover parallel groups - Update Raises docstring for wall-clock timeout path - Add 10 new tests: _IDLE_IGNORED_EVENTS membership, session timeout behavior, and concurrent _ensure_client_started race safety Co-Authored-By: Claude Opus 4.6 --- src/conductor/providers/copilot.py | 47 ++-- tests/test_providers/test_idle_recovery.py | 237 +++++++++++++++++++++ 2 files changed, 267 insertions(+), 17 deletions(-) diff --git a/src/conductor/providers/copilot.py b/src/conductor/providers/copilot.py index 0a99c34..2a2db4f 100644 --- a/src/conductor/providers/copilot.py +++ b/src/conductor/providers/copilot.py @@ -24,6 +24,20 @@ logger = logging.getLogger(__name__) +# Events that should NOT reset the idle-detection clock. These are internal +# bookkeeping / lifecycle events that can fire continuously (e.g. during stuck +# MCP initialization) without reflecting real agent progress. +# - pending_messages.modified: fires repeatedly while MCP messages are queued +# - session.start: one-time lifecycle event at session setup +# - session.info: one-time informational metadata at session setup +_IDLE_IGNORED_EVENTS: frozenset[str] = frozenset( + { + "pending_messages.modified", + "session.start", + "session.info", + } +) + # Try to import the Copilot SDK try: from copilot import CopilotClient @@ -397,9 +411,8 @@ async def _execute_sdk_call( is_retryable=False, ) - # Ensure client is started - if not self._started: - await self._ensure_client_started() + # Ensure client is started (lock handles fast-path internally) + await self._ensure_client_started() model = agent.model or self._default_model @@ -655,15 +668,9 @@ def on_event(event: Any) -> None: tool_info = f" tool={tn}" logger.debug("sdk_event: %s%s", event_type, tool_info) - # Update last activity only for events that indicate real agent work. - # Internal bookkeeping events like "pending_messages.modified" can fire - # continuously during stuck MCP initialization, which would reset the - # idle clock and prevent the watchdog from ever detecting a hang. - _IDLE_IGNORED_EVENTS = { - "pending_messages.modified", - "session.start", - "session.info", - } + # Only update the idle clock for events that indicate real agent + # work. Bookkeeping/lifecycle events are excluded via the + # module-level _IDLE_IGNORED_EVENTS constant. if event_type not in _IDLE_IGNORED_EVENTS: last_activity_ref[0] = event_type last_activity_ref[2] = time.monotonic() @@ -1304,7 +1311,8 @@ async def _wait_with_idle_detection( for tracking last activity. Raises: - ProviderError: If all recovery attempts are exhausted. + ProviderError: If all recovery attempts are exhausted, or if the + session exceeds max_session_seconds wall-clock duration. """ recovery_attempts = 0 idle_timeout = self._idle_recovery_config.idle_timeout_seconds @@ -1320,19 +1328,23 @@ async def _wait_with_idle_detection( # Hard wall-clock limit — prevents sessions from hanging # indefinitely even if events keep flowing (e.g. repeated # pending_messages.modified during stuck MCP initialization). + # Note: this check runs at idle_timeout_seconds granularity, so + # actual max duration is approximately max_session + idle_timeout. elapsed = time.monotonic() - session_start if elapsed > max_session: last_event_type = last_activity_ref[0] last_tool_call = last_activity_ref[1] + time_since_last = time.monotonic() - last_activity_ref[2] stuck_info = self._build_stuck_info(last_event_type, last_tool_call) raise ProviderError( - f"Session exceeded maximum duration of {max_session:.0f}s. {stuck_info}", + f"Session exceeded maximum duration of {max_session:.0f}s. " + f"{stuck_info} Last real event {time_since_last:.0f}s ago.", suggestion=( f"The session ran for {elapsed:.0f}s without completing. " "This may indicate a stuck MCP server, infinite tool loop, " "or provider issue. Enable --log-file to capture full debug output." ), - is_retryable=True, + is_retryable=False, # Don't retry — same root cause will recur ) try: @@ -1401,8 +1413,9 @@ async def _wait_with_idle_detection( async def _ensure_client_started(self) -> None: """Ensure the Copilot client is started. - Uses a lock to prevent concurrent for-each items from racing - to start the same client subprocess multiple times. + Uses a lock to prevent concurrent agents (parallel groups or + for-each iterations) from racing to start the same client + subprocess multiple times. """ async with self._start_lock: if self._client is None: diff --git a/tests/test_providers/test_idle_recovery.py b/tests/test_providers/test_idle_recovery.py index 7649c8b..3574ed0 100644 --- a/tests/test_providers/test_idle_recovery.py +++ b/tests/test_providers/test_idle_recovery.py @@ -10,6 +10,7 @@ from conductor.config.schema import AgentDef from conductor.exceptions import ProviderError from conductor.providers.copilot import ( + _IDLE_IGNORED_EVENTS, CopilotProvider, IdleRecoveryConfig, ) @@ -28,6 +29,7 @@ def test_default_values(self) -> None: config = IdleRecoveryConfig() assert config.idle_timeout_seconds == 300.0 assert config.max_recovery_attempts == 3 + assert config.max_session_seconds == 1800.0 assert "{last_activity}" in config.recovery_prompt def test_custom_values(self) -> None: @@ -35,10 +37,12 @@ def test_custom_values(self) -> None: config = IdleRecoveryConfig( idle_timeout_seconds=60.0, max_recovery_attempts=5, + max_session_seconds=600.0, recovery_prompt="Custom prompt: {last_activity}", ) assert config.idle_timeout_seconds == 60.0 assert config.max_recovery_attempts == 5 + assert config.max_session_seconds == 600.0 assert config.recovery_prompt == "Custom prompt: {last_activity}" @@ -503,3 +507,236 @@ def simulate_callback(): assert ref[0] == "tool.execution_start" assert ref[1] == "web_search" assert ref[2] == 123.456 + + +class TestIdleIgnoredEvents: + """Tests for the _IDLE_IGNORED_EVENTS constant and filtering behavior.""" + + def test_ignored_events_is_frozenset(self) -> None: + """Test that _IDLE_IGNORED_EVENTS is an immutable frozenset.""" + assert isinstance(_IDLE_IGNORED_EVENTS, frozenset) + + def test_ignored_events_contains_expected_members(self) -> None: + """Test that all expected bookkeeping events are in the set.""" + assert "pending_messages.modified" in _IDLE_IGNORED_EVENTS + assert "session.start" in _IDLE_IGNORED_EVENTS + assert "session.info" in _IDLE_IGNORED_EVENTS + + def test_real_events_not_in_ignored_set(self) -> None: + """Test that real agent-work events are NOT in the ignored set.""" + real_events = [ + "assistant.message", + "assistant.reasoning", + "tool.execution_start", + "tool.execution_complete", + "session.idle", + ] + for event in real_events: + assert event not in _IDLE_IGNORED_EVENTS, f"{event} should not be ignored" + + +class TestSessionTimeout: + """Tests for max_session_seconds wall-clock timeout.""" + + @pytest.mark.asyncio + async def test_session_timeout_raises_provider_error(self) -> None: + """Test that exceeding max_session_seconds raises ProviderError.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=0.05, + max_recovery_attempts=10, + max_session_seconds=0.01, # Very short — will fire quickly + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() # Never set + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = [None, None, time.monotonic()] + + with pytest.raises(ProviderError) as exc_info: + await provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ) + + assert "exceeded maximum duration" in str(exc_info.value) + assert not exc_info.value.is_retryable + + @pytest.mark.asyncio + async def test_session_timeout_includes_time_since_last_event(self) -> None: + """Test that the timeout error includes time since last real event.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=0.05, + max_recovery_attempts=10, + max_session_seconds=0.01, + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = ["tool.execution_start", "stuck_tool", time.monotonic()] + + with pytest.raises(ProviderError) as exc_info: + await provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ) + + error_msg = str(exc_info.value) + assert "stuck_tool" in error_msg + assert "Last real event" in error_msg + assert "ago" in error_msg + + @pytest.mark.asyncio + async def test_session_timeout_fires_even_with_flowing_events(self) -> None: + """Test that wall-clock timeout fires even when events keep flowing. + + This is the key distinction from idle timeout: even if non-ignored + events keep resetting the idle clock, the hard cap still fires. + """ + config = IdleRecoveryConfig( + idle_timeout_seconds=60.0, # Very high — won't trigger idle + max_recovery_attempts=10, + max_session_seconds=0.15, # Short wall-clock limit + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = ["assistant.message", None, time.monotonic()] + + # Keep updating the activity timestamp to simulate flowing events + async def simulate_events() -> None: + while not done.is_set(): + await asyncio.sleep(0.02) + last_activity_ref[2] = time.monotonic() + + with pytest.raises(ProviderError) as exc_info: + await asyncio.gather( + provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ), + simulate_events(), + ) + + assert "exceeded maximum duration" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_session_completes_before_timeout(self) -> None: + """Test that sessions completing before max_session_seconds are fine.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=10.0, + max_session_seconds=10.0, # Won't be reached + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + + last_activity_ref: list[Any] = [None, None, time.monotonic()] + + async def complete_quickly() -> None: + await asyncio.sleep(0.02) + done.set() + + # Should not raise + await asyncio.gather( + provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ), + complete_quickly(), + ) + + +class TestStartupRace: + """Tests for asyncio.Lock in _ensure_client_started.""" + + def test_start_lock_exists(self) -> None: + """Test that the provider has a _start_lock attribute.""" + provider = CopilotProvider(mock_handler=stub_handler) + assert isinstance(provider._start_lock, asyncio.Lock) + + @pytest.mark.asyncio + async def test_concurrent_ensure_started_calls_start_once(self) -> None: + """Test that concurrent _ensure_client_started calls only start once. + + Simulates the for-each / parallel group race: multiple coroutines + all call _ensure_client_started() concurrently, but start() should + only be invoked once. + """ + provider = CopilotProvider(mock_handler=stub_handler) + + start_call_count = 0 + + class MockClient: + async def start(self_inner) -> None: + nonlocal start_call_count + start_call_count += 1 + # Simulate slow startup to widen the race window + await asyncio.sleep(0.05) + + provider._client = MockClient() + provider._started = False + + # Stub out _fix_pipe_blocking_mode since we don't have real pipes + provider._fix_pipe_blocking_mode = lambda: None # type: ignore[assignment] + + # Launch 5 concurrent calls + await asyncio.gather(*[provider._ensure_client_started() for _ in range(5)]) + + assert start_call_count == 1 + assert provider._started is True + + @pytest.mark.asyncio + async def test_fix_pipe_blocking_mode_called_once(self) -> None: + """Test that _fix_pipe_blocking_mode is called exactly once under concurrency.""" + provider = CopilotProvider(mock_handler=stub_handler) + + fix_pipe_count = 0 + + class MockClient: + async def start(self_inner) -> None: + await asyncio.sleep(0.02) + + def mock_fix_pipe() -> None: + nonlocal fix_pipe_count + fix_pipe_count += 1 + + provider._client = MockClient() + provider._started = False + provider._fix_pipe_blocking_mode = mock_fix_pipe # type: ignore[assignment] + + await asyncio.gather(*[provider._ensure_client_started() for _ in range(3)]) + + assert fix_pipe_count == 1 From 6ff80b76a3789b107657b6cce873b87635991084 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Fri, 6 Mar 2026 14:37:57 -0500 Subject: [PATCH 4/4] fix(tests): speed up slow timeout test and add missing assertion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change idle_timeout_seconds from 60s to 0.05s in test_session_timeout_fires_even_with_flowing_events so the loop iterates quickly instead of blocking for a full minute (62s → 2s) - Add missing is_retryable assertion to flowing-events timeout test - Fix misleading "fast-path" comment on _ensure_client_started call Co-Authored-By: Claude Opus 4.6 --- src/conductor/providers/copilot.py | 2 +- tests/test_providers/test_idle_recovery.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/conductor/providers/copilot.py b/src/conductor/providers/copilot.py index 2a2db4f..ff8c60d 100644 --- a/src/conductor/providers/copilot.py +++ b/src/conductor/providers/copilot.py @@ -411,7 +411,7 @@ async def _execute_sdk_call( is_retryable=False, ) - # Ensure client is started (lock handles fast-path internally) + # Ensure client is started; lock serializes concurrent first calls await self._ensure_client_started() model = agent.model or self._default_model diff --git a/tests/test_providers/test_idle_recovery.py b/tests/test_providers/test_idle_recovery.py index 3574ed0..e91c6da 100644 --- a/tests/test_providers/test_idle_recovery.py +++ b/tests/test_providers/test_idle_recovery.py @@ -610,7 +610,7 @@ async def test_session_timeout_fires_even_with_flowing_events(self) -> None: events keep resetting the idle clock, the hard cap still fires. """ config = IdleRecoveryConfig( - idle_timeout_seconds=60.0, # Very high — won't trigger idle + idle_timeout_seconds=0.05, # Short — loop iterates quickly max_recovery_attempts=10, max_session_seconds=0.15, # Short wall-clock limit ) @@ -644,6 +644,7 @@ async def simulate_events() -> None: ) assert "exceeded maximum duration" in str(exc_info.value) + assert not exc_info.value.is_retryable @pytest.mark.asyncio async def test_session_completes_before_timeout(self) -> None: