Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion src/conductor/providers/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,12 @@ async def execute(
"""
# Use retry logic wrapper for execution
return await self._execute_with_retry(
agent, context, rendered_prompt, tools, interrupt_signal=interrupt_signal
agent,
context,
rendered_prompt,
tools,
interrupt_signal=interrupt_signal,
event_callback=event_callback,
)

def _is_retryable_error(self, exception: Exception) -> bool:
Expand Down Expand Up @@ -543,6 +548,7 @@ async def _execute_with_retry(
rendered_prompt: str,
tools: list[str] | None = None,
interrupt_signal: asyncio.Event | None = None,
event_callback: EventCallback | None = None,
) -> AgentOutput:
"""Execute with exponential backoff retry logic and MCP tool support.

Expand All @@ -558,6 +564,7 @@ async def _execute_with_retry(
rendered_prompt: Jinja2-rendered user prompt.
tools: List of tool names available to this agent (for MCP tool filtering).
interrupt_signal: Optional event for mid-agent interrupt signaling.
event_callback: Optional callback for streaming SDK events upstream.

Returns:
Normalized AgentOutput with structured content.
Expand Down Expand Up @@ -633,6 +640,7 @@ async def _execute_with_retry(
output_schema=agent.output,
has_output_schema=has_output_schema,
interrupt_signal=interrupt_signal,
event_callback=event_callback,
)

# Handle partial output from mid-agent interrupt
Expand Down Expand Up @@ -876,6 +884,7 @@ async def _execute_agentic_loop(
has_output_schema: bool,
max_iterations: int = 10,
interrupt_signal: asyncio.Event | None = None,
event_callback: EventCallback | None = None,
) -> tuple[ClaudeResponse, int | None, bool]:
"""Execute an agentic loop that handles MCP tool calls.

Expand All @@ -899,6 +908,7 @@ async def _execute_agentic_loop(
has_output_schema: Whether agent has output schema defined.
max_iterations: Maximum number of tool-use iterations to prevent infinite loops.
interrupt_signal: Optional event that signals a mid-agent interrupt.
event_callback: Optional callback for streaming SDK events upstream.

Returns:
Tuple of (final_response, total_tokens_used, is_partial).
Expand All @@ -915,6 +925,13 @@ async def _execute_agentic_loop(
iteration += 1
logger.debug(f"Agentic loop iteration {iteration}/{max_iterations}")

# Emit turn start event
if event_callback:
try:
event_callback("agent_turn_start", {"turn": iteration})
except Exception:
logger.debug("Error in event_callback for agent_turn_start", exc_info=True)

# Check for mid-agent interrupt at top of each iteration
if interrupt_signal is not None and interrupt_signal.is_set():
interrupt_signal.clear()
Expand Down Expand Up @@ -957,6 +974,15 @@ async def _execute_agentic_loop(
output_tokens = getattr(response.usage, "output_tokens", 0)
total_tokens += input_tokens + output_tokens

# Emit agent_message events for text blocks in the response
if event_callback:
for block in response.content:
if hasattr(block, "type") and block.type == "text" and block.text:
try:
event_callback("agent_message", {"content": block.text})
except Exception:
logger.debug("Error in event_callback for agent_message", exc_info=True)

# Check for tool_use blocks
tool_uses = [
block
Expand Down Expand Up @@ -998,6 +1024,21 @@ async def _execute_agentic_loop(
# Execute each MCP tool call
tool_results: list[dict[str, Any]] = []
for tool_use in mcp_tool_uses:
# Emit tool start event
if event_callback:
try:
arguments = (
str(dict(tool_use.input))[:500]
if hasattr(tool_use, "input") and tool_use.input
else None
)
event_callback(
"agent_tool_start",
{"tool_name": tool_use.name, "arguments": arguments},
)
except Exception:
logger.debug("Error in event_callback for agent_tool_start", exc_info=True)

try:
result = await self._mcp_manager.call_tool(
tool_use.name, dict(tool_use.input) if hasattr(tool_use, "input") else {}
Expand All @@ -1010,6 +1051,23 @@ async def _execute_agentic_loop(
}
)
logger.debug(f"MCP tool '{tool_use.name}' succeeded")

# Emit tool complete event (success)
if event_callback:
try:
event_callback(
"agent_tool_complete",
{
"tool_name": tool_use.name,
"result": str(result)[:500] if result else None,
},
)
except Exception:
logger.debug(
"Error in event_callback for agent_tool_complete",
exc_info=True,
)

except Exception as e:
logger.error(f"MCP tool '{tool_use.name}' failed: {e}")
tool_results.append(
Expand All @@ -1021,6 +1079,22 @@ async def _execute_agentic_loop(
}
)

# Emit tool complete event (failure)
if event_callback:
try:
event_callback(
"agent_tool_complete",
{
"tool_name": tool_use.name,
"result": f"Error: {e}",
},
)
except Exception:
logger.debug(
"Error in event_callback for agent_tool_complete",
exc_info=True,
)

# Build assistant message with the tool_use content
# We need to serialize the content blocks properly
assistant_content: list[dict[str, Any]] = []
Expand Down
Loading
Loading