diff --git a/src/conductor/providers/copilot.py b/src/conductor/providers/copilot.py index 213e7eb..ff8c60d 100644 --- a/src/conductor/providers/copilot.py +++ b/src/conductor/providers/copilot.py @@ -24,6 +24,20 @@ logger = logging.getLogger(__name__) +# Events that should NOT reset the idle-detection clock. These are internal +# bookkeeping / lifecycle events that can fire continuously (e.g. during stuck +# MCP initialization) without reflecting real agent progress. +# - pending_messages.modified: fires repeatedly while MCP messages are queued +# - session.start: one-time lifecycle event at session setup +# - session.info: one-time informational metadata at session setup +_IDLE_IGNORED_EVENTS: frozenset[str] = frozenset( + { + "pending_messages.modified", + "session.start", + "session.info", + } +) + # Try to import the Copilot SDK try: from copilot import CopilotClient @@ -66,12 +80,15 @@ class IdleRecoveryConfig: Attributes: idle_timeout_seconds: Time without any SDK events before considering session idle. max_recovery_attempts: Maximum number of "continue" messages to send before failing. + max_session_seconds: Hard wall-clock limit on total session duration. Prevents + sessions from hanging indefinitely even if non-idle events keep flowing. recovery_prompt: Template for the recovery message sent to stuck sessions. Use {last_activity} placeholder for context about what was happening. """ idle_timeout_seconds: float = 300.0 # 5 minutes max_recovery_attempts: int = 3 + max_session_seconds: float = 1800.0 # 30 minutes recovery_prompt: str = ( "It appears you may have gotten stuck or stopped responding. " "Your last activity was: {last_activity}. " @@ -156,6 +173,7 @@ def __init__( self._default_model = model or "gpt-4o" self._mcp_servers = mcp_servers or {} self._started = False + self._start_lock = asyncio.Lock() self._idle_recovery_config = idle_recovery_config or IdleRecoveryConfig() self._temperature = temperature self._session_ids: dict[str, str] = {} @@ -393,9 +411,8 @@ async def _execute_sdk_call( is_retryable=False, ) - # Ensure client is started - if not self._started: - await self._ensure_client_started() + # Ensure client is started; lock serializes concurrent first calls + await self._ensure_client_started() model = agent.model or self._default_model @@ -651,9 +668,12 @@ def on_event(event: Any) -> None: tool_info = f" tool={tn}" logger.debug("sdk_event: %s%s", event_type, tool_info) - # Update last activity on EVERY event (this is key for idle detection!) - last_activity_ref[0] = event_type - last_activity_ref[2] = time.monotonic() + # Only update the idle clock for events that indicate real agent + # work. Bookkeeping/lifecycle events are excluded via the + # module-level _IDLE_IGNORED_EVENTS constant. + if event_type not in _IDLE_IGNORED_EVENTS: + last_activity_ref[0] = event_type + last_activity_ref[2] = time.monotonic() if event_type == "assistant.message": response_content = event.data.content @@ -1291,10 +1311,13 @@ async def _wait_with_idle_detection( for tracking last activity. Raises: - ProviderError: If all recovery attempts are exhausted. + ProviderError: If all recovery attempts are exhausted, or if the + session exceeds max_session_seconds wall-clock duration. """ recovery_attempts = 0 idle_timeout = self._idle_recovery_config.idle_timeout_seconds + session_start = time.monotonic() + max_session = self._idle_recovery_config.max_session_seconds while True: # Check if done was already set (avoids race where session.idle @@ -1302,6 +1325,28 @@ async def _wait_with_idle_detection( if done.is_set(): return + # Hard wall-clock limit — prevents sessions from hanging + # indefinitely even if events keep flowing (e.g. repeated + # pending_messages.modified during stuck MCP initialization). + # Note: this check runs at idle_timeout_seconds granularity, so + # actual max duration is approximately max_session + idle_timeout. + elapsed = time.monotonic() - session_start + if elapsed > max_session: + last_event_type = last_activity_ref[0] + last_tool_call = last_activity_ref[1] + time_since_last = time.monotonic() - last_activity_ref[2] + stuck_info = self._build_stuck_info(last_event_type, last_tool_call) + raise ProviderError( + f"Session exceeded maximum duration of {max_session:.0f}s. " + f"{stuck_info} Last real event {time_since_last:.0f}s ago.", + suggestion=( + f"The session ran for {elapsed:.0f}s without completing. " + "This may indicate a stuck MCP server, infinite tool loop, " + "or provider issue. Enable --log-file to capture full debug output." + ), + is_retryable=False, # Don't retry — same root cause will recur + ) + try: # Wait for done with idle timeout await asyncio.wait_for( @@ -1366,17 +1411,23 @@ async def _wait_with_idle_detection( done.clear() async def _ensure_client_started(self) -> None: - """Ensure the Copilot client is started.""" - if self._client is None: - self._client = CopilotClient() - if not self._started: - await self._client.start() - self._started = True - - # Ensure subprocess pipes are in blocking mode to prevent - # BlockingIOError on large payloads. The asyncio event loop - # may set O_NONBLOCK on inherited file descriptors. - self._fix_pipe_blocking_mode() + """Ensure the Copilot client is started. + + Uses a lock to prevent concurrent agents (parallel groups or + for-each iterations) from racing to start the same client + subprocess multiple times. + """ + async with self._start_lock: + if self._client is None: + self._client = CopilotClient() + if not self._started: + await self._client.start() + self._started = True + + # Ensure subprocess pipes are in blocking mode to prevent + # BlockingIOError on large payloads. The asyncio event loop + # may set O_NONBLOCK on inherited file descriptors. + self._fix_pipe_blocking_mode() def _fix_pipe_blocking_mode(self) -> None: """Clear O_NONBLOCK on the Copilot CLI subprocess pipes. diff --git a/tests/test_providers/test_idle_recovery.py b/tests/test_providers/test_idle_recovery.py index 7649c8b..e91c6da 100644 --- a/tests/test_providers/test_idle_recovery.py +++ b/tests/test_providers/test_idle_recovery.py @@ -10,6 +10,7 @@ from conductor.config.schema import AgentDef from conductor.exceptions import ProviderError from conductor.providers.copilot import ( + _IDLE_IGNORED_EVENTS, CopilotProvider, IdleRecoveryConfig, ) @@ -28,6 +29,7 @@ def test_default_values(self) -> None: config = IdleRecoveryConfig() assert config.idle_timeout_seconds == 300.0 assert config.max_recovery_attempts == 3 + assert config.max_session_seconds == 1800.0 assert "{last_activity}" in config.recovery_prompt def test_custom_values(self) -> None: @@ -35,10 +37,12 @@ def test_custom_values(self) -> None: config = IdleRecoveryConfig( idle_timeout_seconds=60.0, max_recovery_attempts=5, + max_session_seconds=600.0, recovery_prompt="Custom prompt: {last_activity}", ) assert config.idle_timeout_seconds == 60.0 assert config.max_recovery_attempts == 5 + assert config.max_session_seconds == 600.0 assert config.recovery_prompt == "Custom prompt: {last_activity}" @@ -503,3 +507,237 @@ def simulate_callback(): assert ref[0] == "tool.execution_start" assert ref[1] == "web_search" assert ref[2] == 123.456 + + +class TestIdleIgnoredEvents: + """Tests for the _IDLE_IGNORED_EVENTS constant and filtering behavior.""" + + def test_ignored_events_is_frozenset(self) -> None: + """Test that _IDLE_IGNORED_EVENTS is an immutable frozenset.""" + assert isinstance(_IDLE_IGNORED_EVENTS, frozenset) + + def test_ignored_events_contains_expected_members(self) -> None: + """Test that all expected bookkeeping events are in the set.""" + assert "pending_messages.modified" in _IDLE_IGNORED_EVENTS + assert "session.start" in _IDLE_IGNORED_EVENTS + assert "session.info" in _IDLE_IGNORED_EVENTS + + def test_real_events_not_in_ignored_set(self) -> None: + """Test that real agent-work events are NOT in the ignored set.""" + real_events = [ + "assistant.message", + "assistant.reasoning", + "tool.execution_start", + "tool.execution_complete", + "session.idle", + ] + for event in real_events: + assert event not in _IDLE_IGNORED_EVENTS, f"{event} should not be ignored" + + +class TestSessionTimeout: + """Tests for max_session_seconds wall-clock timeout.""" + + @pytest.mark.asyncio + async def test_session_timeout_raises_provider_error(self) -> None: + """Test that exceeding max_session_seconds raises ProviderError.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=0.05, + max_recovery_attempts=10, + max_session_seconds=0.01, # Very short — will fire quickly + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() # Never set + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = [None, None, time.monotonic()] + + with pytest.raises(ProviderError) as exc_info: + await provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ) + + assert "exceeded maximum duration" in str(exc_info.value) + assert not exc_info.value.is_retryable + + @pytest.mark.asyncio + async def test_session_timeout_includes_time_since_last_event(self) -> None: + """Test that the timeout error includes time since last real event.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=0.05, + max_recovery_attempts=10, + max_session_seconds=0.01, + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = ["tool.execution_start", "stuck_tool", time.monotonic()] + + with pytest.raises(ProviderError) as exc_info: + await provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ) + + error_msg = str(exc_info.value) + assert "stuck_tool" in error_msg + assert "Last real event" in error_msg + assert "ago" in error_msg + + @pytest.mark.asyncio + async def test_session_timeout_fires_even_with_flowing_events(self) -> None: + """Test that wall-clock timeout fires even when events keep flowing. + + This is the key distinction from idle timeout: even if non-ignored + events keep resetting the idle clock, the hard cap still fires. + """ + config = IdleRecoveryConfig( + idle_timeout_seconds=0.05, # Short — loop iterates quickly + max_recovery_attempts=10, + max_session_seconds=0.15, # Short wall-clock limit + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + mock_session.send = AsyncMock() + + last_activity_ref: list[Any] = ["assistant.message", None, time.monotonic()] + + # Keep updating the activity timestamp to simulate flowing events + async def simulate_events() -> None: + while not done.is_set(): + await asyncio.sleep(0.02) + last_activity_ref[2] = time.monotonic() + + with pytest.raises(ProviderError) as exc_info: + await asyncio.gather( + provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ), + simulate_events(), + ) + + assert "exceeded maximum duration" in str(exc_info.value) + assert not exc_info.value.is_retryable + + @pytest.mark.asyncio + async def test_session_completes_before_timeout(self) -> None: + """Test that sessions completing before max_session_seconds are fine.""" + config = IdleRecoveryConfig( + idle_timeout_seconds=10.0, + max_session_seconds=10.0, # Won't be reached + ) + provider = CopilotProvider( + mock_handler=stub_handler, + idle_recovery_config=config, + ) + + done = asyncio.Event() + mock_session = MagicMock() + + last_activity_ref: list[Any] = [None, None, time.monotonic()] + + async def complete_quickly() -> None: + await asyncio.sleep(0.02) + done.set() + + # Should not raise + await asyncio.gather( + provider._wait_with_idle_detection( + done=done, + session=mock_session, + verbose_enabled=False, + full_enabled=False, + last_activity_ref=last_activity_ref, + ), + complete_quickly(), + ) + + +class TestStartupRace: + """Tests for asyncio.Lock in _ensure_client_started.""" + + def test_start_lock_exists(self) -> None: + """Test that the provider has a _start_lock attribute.""" + provider = CopilotProvider(mock_handler=stub_handler) + assert isinstance(provider._start_lock, asyncio.Lock) + + @pytest.mark.asyncio + async def test_concurrent_ensure_started_calls_start_once(self) -> None: + """Test that concurrent _ensure_client_started calls only start once. + + Simulates the for-each / parallel group race: multiple coroutines + all call _ensure_client_started() concurrently, but start() should + only be invoked once. + """ + provider = CopilotProvider(mock_handler=stub_handler) + + start_call_count = 0 + + class MockClient: + async def start(self_inner) -> None: + nonlocal start_call_count + start_call_count += 1 + # Simulate slow startup to widen the race window + await asyncio.sleep(0.05) + + provider._client = MockClient() + provider._started = False + + # Stub out _fix_pipe_blocking_mode since we don't have real pipes + provider._fix_pipe_blocking_mode = lambda: None # type: ignore[assignment] + + # Launch 5 concurrent calls + await asyncio.gather(*[provider._ensure_client_started() for _ in range(5)]) + + assert start_call_count == 1 + assert provider._started is True + + @pytest.mark.asyncio + async def test_fix_pipe_blocking_mode_called_once(self) -> None: + """Test that _fix_pipe_blocking_mode is called exactly once under concurrency.""" + provider = CopilotProvider(mock_handler=stub_handler) + + fix_pipe_count = 0 + + class MockClient: + async def start(self_inner) -> None: + await asyncio.sleep(0.02) + + def mock_fix_pipe() -> None: + nonlocal fix_pipe_count + fix_pipe_count += 1 + + provider._client = MockClient() + provider._started = False + provider._fix_pipe_blocking_mode = mock_fix_pipe # type: ignore[assignment] + + await asyncio.gather(*[provider._ensure_client_started() for _ in range(3)]) + + assert fix_pipe_count == 1