Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,21 @@ jobs:
- name: Install dependencies
run: uv sync --group dev

- name: Remove bundled Copilot CLI binary
run: |
# The github-copilot-sdk >=0.1.23 bundles a CLI binary that tries to
# authenticate with GitHub on startup. Remove it so tests that invoke
# the real CLI path fail fast instead of hanging on auth.
find .venv -path '*/copilot/bin/copilot*' -delete 2>/dev/null || true

- name: Run tests with coverage
run: uv run pytest --cov=src/conductor --cov-report=xml --cov-report=term-missing -m "not real_api"
timeout-minutes: 10
run: uv run pytest --cov=src/conductor --cov-report=xml --cov-report=term-missing -m "not real_api and not performance"
env:
# Fake API key for mock tests to prevent accidental real API calls.
# Real API tests (marked with @pytest.mark.real_api) are excluded from CI
# via the '-m "not real_api"' filter and must be run manually with valid key.
# via the marker filter. Performance tests are also excluded as they
# contain timing-sensitive assertions that are flaky on shared CI runners.
# This ensures CI tests are fast, free, and don't leak credentials.
ANTHROPIC_API_KEY: "sk-ant-test-fake-key-for-mocking"

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies = [
"ruamel.yaml>=0.18.0",
"jinja2>=3.1.0",
"simpleeval>=1.0.0",
"github-copilot-sdk>=0.1.0,<0.1.31", # 0.1.31+ has permission-denied regression, see #27
"github-copilot-sdk>=0.1.28,<0.1.31", # >=0.1.28 for on_permission_request; <0.1.31 regression, see #27
"anthropic>=0.77.0,<1.0.0",
"mcp>=1.0.0",
"fastapi>=0.115.0",
Expand Down
35 changes: 29 additions & 6 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,23 @@ class AgentDef(BaseModel):
"""Per-script timeout in seconds."""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for this agent's Copilot SDK session in seconds.
"""Maximum wall-clock duration for this agent's session in seconds.

Overrides the workflow-level runtime.max_session_seconds for this agent.
Only applies to Copilot provider agents (not script or human_gate).
Only applies to provider-backed agents (not script or human_gate).

Example: A source-gathering agent that should finish in ~60s can set
max_session_seconds: 60 instead of using the default 30-minute timeout.
max_session_seconds: 60 instead of using the default timeout.
"""

max_agent_iterations: int | None = Field(None, ge=1, le=500)
"""Maximum tool-use iterations for this agent execution.

Overrides the workflow-level runtime.max_agent_iterations for this agent.
Only applies to provider-backed agents (not script or human_gate).

Example: A complex coding agent that needs many tool calls can set
max_agent_iterations: 200 instead of using the default limit.
"""

@field_validator("timeout")
Expand Down Expand Up @@ -473,6 +483,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'options'")
if self.max_session_seconds:
raise ValueError("script agents cannot have 'max_session_seconds'")
if self.max_agent_iterations is not None:
raise ValueError("script agents cannot have 'max_agent_iterations'")
return self


Expand Down Expand Up @@ -579,15 +591,26 @@ class RuntimeConfig(BaseModel):
"""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for Copilot SDK sessions in seconds.
"""Maximum wall-clock duration for agent sessions in seconds.

Sets the default max_session_seconds for all agents using the Copilot provider.
Sets the default max_session_seconds for all agents.
Individual agents can override this with their own max_session_seconds field.

Default is None, which uses the Copilot provider's built-in default (1800s / 30 min).
Default is None, which uses the provider's built-in default
(Copilot: 1800s / 30 min, Claude: unlimited).
Set a lower value for workflows where agents should finish quickly.
"""

max_agent_iterations: int | None = Field(None, ge=1, le=500)
"""Maximum tool-use iterations per agent execution.

Caps the number of tool-use roundtrips an agent can perform in a single
execution. This prevents runaway tool loops.

Default is None, which uses the provider's built-in default
(Claude: 50, Copilot: unlimited).
"""


class WorkflowDef(BaseModel):
"""Top-level workflow configuration."""
Expand Down
41 changes: 40 additions & 1 deletion src/conductor/providers/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import random
import time
from typing import TYPE_CHECKING, Any, Protocol

from pydantic import BaseModel
Expand Down Expand Up @@ -112,6 +113,8 @@ def __init__(
timeout: float = 600.0,
retry_config: RetryConfig | None = None,
mcp_servers: dict[str, Any] | None = None,
max_agent_iterations: int | None = None,
max_session_seconds: float | None = None,
) -> None:
"""Initialize the Claude provider.

Expand All @@ -127,6 +130,10 @@ def __init__(
retry_config: Optional retry configuration. Uses default if not provided.
mcp_servers: Optional MCP server configurations for tool support.
Each server config should have: command, args, env (optional).
max_agent_iterations: Maximum tool-use iterations per agent execution.
Defaults to 50 if not specified.
max_session_seconds: Maximum wall-clock duration for agent sessions.
Defaults to None (unlimited).

Raises:
ProviderError: If SDK is not installed.
Expand Down Expand Up @@ -157,6 +164,10 @@ def __init__(
self._retry_history: list[dict[str, Any]] = [] # For testing/debugging retries
self._max_parse_recovery_attempts = 2 # Max retry attempts for malformed JSON
self._max_schema_depth = 10 # Max nesting depth for recursive schema building
self._default_max_agent_iterations = (
max_agent_iterations if max_agent_iterations is not None else 50
)
self._default_max_session_seconds = max_session_seconds

# MCP server configuration for tool support
self._mcp_servers_config = mcp_servers
Expand Down Expand Up @@ -590,6 +601,18 @@ async def _execute_with_retry(
temperature = self._default_temperature
max_tokens = self._default_max_tokens

# Resolve per-agent iteration and session limits
max_agent_iterations = (
agent.max_agent_iterations
if agent.max_agent_iterations is not None
else self._default_max_agent_iterations
)
max_session_seconds = (
agent.max_session_seconds
if agent.max_session_seconds is not None
else self._default_max_session_seconds
)

# Validate max_tokens against model-specific limits
if "haiku" in model.lower():
if max_tokens > 4096:
Expand Down Expand Up @@ -639,6 +662,8 @@ async def _execute_with_retry(
tools=request_tools,
output_schema=agent.output,
has_output_schema=has_output_schema,
max_iterations=max_agent_iterations,
max_session_seconds=max_session_seconds,
interrupt_signal=interrupt_signal,
event_callback=event_callback,
)
Expand Down Expand Up @@ -882,7 +907,8 @@ async def _execute_agentic_loop(
tools: list[dict[str, Any]] | None,
output_schema: dict[str, OutputField] | None,
has_output_schema: bool,
max_iterations: int = 10,
max_iterations: int = 50,
max_session_seconds: float | None = None,
interrupt_signal: asyncio.Event | None = None,
event_callback: EventCallback | None = None,
) -> tuple[ClaudeResponse, int | None, bool]:
Expand All @@ -907,6 +933,8 @@ async def _execute_agentic_loop(
output_schema: Expected output schema.
has_output_schema: Whether agent has output schema defined.
max_iterations: Maximum number of tool-use iterations to prevent infinite loops.
max_session_seconds: Maximum wall-clock duration for this agentic loop.
None means no time limit.
interrupt_signal: Optional event that signals a mid-agent interrupt.
event_callback: Optional callback for streaming SDK events upstream.

Expand All @@ -920,11 +948,22 @@ async def _execute_agentic_loop(
working_messages = list(messages)
total_tokens = 0
iteration = 0
session_start = time.monotonic()

while iteration < max_iterations:
iteration += 1
logger.debug(f"Agentic loop iteration {iteration}/{max_iterations}")

# Check wall-clock session timeout
if max_session_seconds is not None:
elapsed = time.monotonic() - session_start
if elapsed > max_session_seconds:
raise ProviderError(
f"Agent exceeded maximum session duration of {max_session_seconds:.0f}s "
f"after {iteration} tool-use iterations",
is_retryable=False,
)

# Emit turn start event
if event_callback:
try:
Expand Down
48 changes: 46 additions & 2 deletions src/conductor/providers/copilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
mcp_servers: dict[str, Any] | None = None,
idle_recovery_config: IdleRecoveryConfig | None = None,
temperature: float | None = None,
max_agent_iterations: int | None = None,
) -> None:
"""Initialize the Copilot provider.

Expand All @@ -164,6 +165,8 @@ def __init__(
idle_recovery_config: Optional idle detection and recovery configuration.
Uses default if not provided.
temperature: Default temperature for generation (0.0-1.0). Optional.
max_agent_iterations: Maximum tool-use iterations per agent execution.
None means no iteration limit (only wall-clock timeout applies).
"""
self._client: Any = None # Will hold Copilot SDK client
self._mock_handler = mock_handler
Expand All @@ -176,6 +179,7 @@ def __init__(
self._start_lock = asyncio.Lock()
self._idle_recovery_config = idle_recovery_config or IdleRecoveryConfig()
self._temperature = temperature
self._default_max_agent_iterations = max_agent_iterations
self._session_ids: dict[str, str] = {}
self._resume_session_ids: dict[str, str] = {}
self._interrupted_session: Any = None
Expand Down Expand Up @@ -490,6 +494,13 @@ async def _execute_sdk_call(
agent.max_session_seconds or self._idle_recovery_config.max_session_seconds
)

# Resolve per-agent max_agent_iterations override
effective_max_iterations = (
agent.max_agent_iterations
if agent.max_agent_iterations is not None
else self._default_max_agent_iterations
)

session_destroyed = False
try:
# Send initial prompt and get response
Expand All @@ -501,6 +512,7 @@ async def _execute_sdk_call(
interrupt_signal=interrupt_signal,
event_callback=event_callback,
max_session_seconds=effective_max_session,
max_agent_iterations=effective_max_iterations,
)
response_content = sdk_response.content

Expand Down Expand Up @@ -628,6 +640,7 @@ async def _send_and_wait(
interrupt_signal: asyncio.Event | None = None,
event_callback: EventCallback | None = None,
max_session_seconds: float | None = None,
max_agent_iterations: int | None = None,
) -> SDKResponse:
"""Send a prompt to the session and wait for response.

Expand All @@ -642,6 +655,8 @@ async def _send_and_wait(
event_callback: Optional callback for streaming SDK events upstream.
max_session_seconds: Per-agent wall-clock session limit override.
If None, uses the provider-level IdleRecoveryConfig default.
max_agent_iterations: Maximum tool-use iterations for this session.
None means no iteration limit.

Returns:
SDKResponse with content and usage data. If interrupted,
Expand All @@ -661,6 +676,9 @@ async def _send_and_wait(
# Mutable container for usage data: [input_tokens, output_tokens, cache_read, cache_write]
usage_ref: list[int | None] = [None, None, None, None]

# Mutable container for tool iteration counting
tool_iteration_ref: list[int] = [0]

def on_event(event: Any) -> None:
nonlocal response_content, error_message
event_type = event.type.value if hasattr(event.type, "value") else str(event.type)
Expand Down Expand Up @@ -712,6 +730,8 @@ def on_event(event: Any) -> None:
event.data, "name", "unknown"
)
last_activity_ref[1] = tool_name
# Count tool-use iterations
tool_iteration_ref[0] += 1

# Forward structured events upstream via event_callback
if event_callback is not None:
Expand Down Expand Up @@ -753,6 +773,8 @@ def on_event(event: Any) -> None:
full_enabled,
last_activity_ref,
max_session_seconds=max_session_seconds,
tool_iteration_ref=tool_iteration_ref,
max_agent_iterations=max_agent_iterations,
)

if error_message:
Expand Down Expand Up @@ -1309,6 +1331,8 @@ async def _wait_with_idle_detection(
full_enabled: bool,
last_activity_ref: list[Any],
max_session_seconds: float | None = None,
tool_iteration_ref: list[int] | None = None,
max_agent_iterations: int | None = None,
) -> None:
"""Wait for session completion with idle detection and recovery.

Expand All @@ -1326,10 +1350,14 @@ async def _wait_with_idle_detection(
for tracking last activity.
max_session_seconds: Per-agent wall-clock session limit override.
If None, uses the provider-level IdleRecoveryConfig default.
tool_iteration_ref: Mutable [count] tracking tool execution starts.
max_agent_iterations: Maximum tool-use iterations allowed.
None means no iteration limit.

Raises:
ProviderError: If all recovery attempts are exhausted, or if the
session exceeds max_session_seconds wall-clock duration.
ProviderError: If all recovery attempts are exhausted, if the
session exceeds max_session_seconds wall-clock duration, or
if max_agent_iterations is exceeded.
"""
recovery_attempts = 0
idle_timeout = self._idle_recovery_config.idle_timeout_seconds
Expand Down Expand Up @@ -1364,6 +1392,22 @@ async def _wait_with_idle_detection(
is_retryable=False, # Don't retry — same root cause will recur
)

# Check tool-use iteration limit
if (
max_agent_iterations is not None
and tool_iteration_ref is not None
and tool_iteration_ref[0] > max_agent_iterations
):
raise ProviderError(
f"Agent exceeded maximum tool-use iterations ({max_agent_iterations})",
suggestion=(
"The agent performed too many tool calls. "
"Increase max_agent_iterations in runtime config or per-agent "
"settings if the agent legitimately needs more iterations."
),
is_retryable=False,
)

try:
# Wait for done with idle timeout
await asyncio.wait_for(
Expand Down
10 changes: 8 additions & 2 deletions src/conductor/providers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def create_provider(
max_tokens: int | None = None,
timeout: float | None = None,
max_session_seconds: float | None = None,
max_agent_iterations: int | None = None,
) -> AgentProvider:
"""Factory function to create the appropriate provider.

Expand All @@ -41,8 +42,8 @@ async def create_provider(
temperature: Default temperature for generation (0.0-1.0).
max_tokens: Maximum output tokens.
timeout: Request timeout in seconds.
max_session_seconds: Maximum wall-clock duration for Copilot SDK sessions.
Only applies to the Copilot provider.
max_session_seconds: Maximum wall-clock duration for agent sessions.
max_agent_iterations: Maximum tool-use iterations per agent execution.

Returns:
Configured AgentProvider instance.
Expand All @@ -67,6 +68,7 @@ async def create_provider(
model=default_model,
temperature=temperature,
idle_recovery_config=idle_recovery_config,
max_agent_iterations=max_agent_iterations,
)
case "openai-agents":
raise ProviderError(
Expand All @@ -85,6 +87,8 @@ async def create_provider(
max_tokens=max_tokens,
timeout=timeout if timeout is not None else 600.0,
mcp_servers=mcp_servers,
max_agent_iterations=max_agent_iterations,
max_session_seconds=max_session_seconds,
)
case _:
raise ProviderError(
Expand Down Expand Up @@ -135,6 +139,7 @@ async def create_provider(
max_tokens = getattr(runtime_config, "max_tokens", None)
timeout = getattr(runtime_config, "timeout", None)
max_session_seconds = getattr(runtime_config, "max_session_seconds", None)
max_agent_iterations = getattr(runtime_config, "max_agent_iterations", None)

return await create_provider(
provider_type=provider_type,
Expand All @@ -144,4 +149,5 @@ async def create_provider(
max_tokens=max_tokens,
timeout=timeout,
max_session_seconds=max_session_seconds,
max_agent_iterations=max_agent_iterations,
)
Loading
Loading