-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
agentsAI Agent module (pyworkflow_agents)AI Agent module (pyworkflow_agents)epicLarge initiative containing multiple related issuesLarge initiative containing multiple related issuesfeatureFeature to be implementedFeature to be implemented
Description
Overview
The @agent decorator and Agent base class provide the core abstraction for AI agents in PyWorkflow. Following PyWorkflow's established dual API pattern (used in @workflow/Workflow and @step/Step), agents support both functional (decorator-based) and object-oriented (class-based) interfaces.
This design enables:
- Declarative agent definition with system prompts, tools, and model configuration
- Event-sourced execution where every agent action becomes a durable event
- Seamless PyWorkflow integration as workflow steps or standalone executors
- Provider-agnostic model access via the LLM abstraction layer
Architecture
The agent decorator/base class sits at the top of the agent stack, orchestrating the agentic loop (prompt → LLM → tool calls → repeat).
@agent decorator / Agent base class
↓
Agent Execution Loop (max_iterations)
├── 1. Build message list (system + user + history)
├── 2. Call LLM (via BaseLLMProvider)
├── 3. Record AGENT_LLM_CALL + AGENT_LLM_RESPONSE events
├── 4. If tool_calls → execute via ToolRegistry
├── 5. Record AGENT_TOOL_CALL + AGENT_TOOL_RESULT events
├── 6. Append tool results to messages
├── 7. Repeat or return final response
↓
AgentResult (content, messages, tool_calls_made, token_usage)
Event Sourcing Integration
Every agent action is recorded as an event:
AGENT_STARTED- Agent begins executionAGENT_LLM_CALL- LLM API call initiatedAGENT_LLM_RESPONSE- LLM response receivedAGENT_TOOL_CALL- Tool execution requested (handled by ToolRegistry)AGENT_TOOL_RESULT- Tool result returned (handled by ToolRegistry)AGENT_RESPONSE- Agent produces final answerAGENT_COMPLETED- Agent finishes (success)AGENT_ERROR- Agent encounters error
On replay, cached LLM responses and tool results are used instead of re-executing API calls—ensuring deterministic, cost-free replay.
Reference Implementations
Leading 2025 AI frameworks demonstrate proven agent patterns:
Vercel AI SDK Agent Interface
- Provides high-level agent abstraction with built-in tool calling and streaming
- Reference for clean, declarative agent API
LangGraph create_react_agent
- How to Build Your First AI Agent in 2025: Step-by-Step with Python & LangGraph
- ReAct-style agent loop: Reason → Act → Observe
- Stateful graph architecture for agent control flow
Agent-Patterns Library
- Agent-Patterns GitHub
- Agent-Patterns Design Document
- Reusable base classes for common AI agent workflows using LangGraph and LangChain
- Modular base classes define common agent operations with specialized logic like planning and reflection
OpenAI Agents SDK
- OpenAI Agents SDK - Agents
- OpenAI Agents SDK - Examples
- Official SDK for building OpenAI agents with tool integration
Multi-Agent Patterns
- Multi-Agent System Design Patterns From Scratch In Python (Medium)
- ReAct agents, hierarchical patterns, collaborative patterns
- Multi-Agent Collaboration Patterns with Strands Agents (AWS)
Python Decorator Patterns
- Custom Python Decorator Patterns Worth Copy-Pasting Forever (KDnuggets)
- Mastering Decorators in Base and Derived Python Classes
Proposed Implementation
Core Types
from dataclasses import dataclass
from typing import Optional, Any
from pyworkflow.agents.providers import BaseLLMProvider, Message
from pyworkflow.agents.tools import ToolDefinition
@dataclass
class AgentResult:
"""Result from agent execution."""
content: str # Final response text
messages: list[Message] # Full conversation history
tool_calls_made: int # Number of tool calls
token_usage: dict # {"input": int, "output": int, "total": int}
iterations: int # Number of agentic loop iterations
finish_reason: str # "stop", "max_iterations", "error"
error: Optional[str] = None # Error message if failed@agent Decorator (Functional API)
from functools import wraps
from typing import Callable, Optional
from pyworkflow.context import get_context
from pyworkflow.agents.providers import BaseLLMProvider, AnthropicProvider
from pyworkflow.agents.tools import ToolRegistry, get_global_registry
def agent(
model: Optional[str] = None,
provider: Optional[BaseLLMProvider] = None,
tools: Optional[list[Callable]] = None,
system_prompt: Optional[str] = None,
max_iterations: int = 20,
temperature: float = 0.7,
registry: Optional[ToolRegistry] = None
):
"""
Decorator to convert a function into an AI agent.
The function's docstring becomes the system prompt (if not explicitly provided).
Args:
model: Model name (e.g., "claude-sonnet-4-5-20250929", "gpt-4o")
provider: LLM provider instance (overrides model)
tools: List of @tool decorated functions
system_prompt: System prompt (defaults to function docstring)
max_iterations: Maximum agentic loop iterations
temperature: LLM temperature
registry: ToolRegistry (defaults to global)
Example:
@agent(model="claude-sonnet-4-5-20250929", tools=[search, calc])
async def research_agent(query: str):
'''You are a helpful research assistant.'''
"""
def decorator(func: Callable):
# Extract system prompt from docstring if not provided
_system_prompt = system_prompt or (inspect.getdoc(func) or "You are a helpful assistant.")
# Create provider if not provided
_provider = provider or AnthropicProvider(model=model or "claude-sonnet-4-5-20250929")
# Get tool registry
_registry = registry or get_global_registry()
# Register tools if provided
tool_defs = []
if tools:
for tool in tools:
if hasattr(tool, "_tool_definition"):
tool_defs.append(tool._tool_definition)
@wraps(func)
async def wrapper(*args, **kwargs):
# Create agent instance
agent_instance = _Agent(
name=func.__name__,
system_prompt=_system_prompt,
provider=_provider,
tools=tool_defs,
max_iterations=max_iterations,
registry=_registry
)
# Execute agent (pass first arg as user query)
user_query = args[0] if args else kwargs.get("query", "")
return await agent_instance.execute(user_query)
wrapper._is_agent = True
wrapper._system_prompt = _system_prompt
wrapper._provider = _provider
return wrapper
return decoratorAgent Base Class (OOP API)
from abc import ABC, abstractmethod
class Agent(ABC):
"""
Base class for AI agents (OOP API).
Subclass and override `run()` to define agent behavior.
Example:
class ResearchAgent(Agent):
model = "claude-sonnet-4-5-20250929"
tools = [search_tool, calc_tool]
system_prompt = "You are a helpful research assistant."
max_iterations = 20
async def run(self, query: str) -> AgentResult:
return await self.execute(query)
"""
# Class attributes (configuration)
model: str = "claude-sonnet-4-5-20250929"
provider: Optional[BaseLLMProvider] = None
tools: list[Callable] = []
system_prompt: str = "You are a helpful assistant."
max_iterations: int = 20
temperature: float = 0.7
registry: Optional[ToolRegistry] = None
def __init__(self, **kwargs):
# Allow instance-level overrides
for key, value in kwargs.items():
setattr(self, key, value)
# Initialize provider
if self.provider is None:
self.provider = AnthropicProvider(model=self.model, temperature=self.temperature)
# Get registry
if self.registry is None:
self.registry = get_global_registry()
# Extract tool definitions
self.tool_defs = []
for tool in self.tools:
if hasattr(tool, "_tool_definition"):
self.tool_defs.append(tool._tool_definition)
@abstractmethod
async def run(self, *args, **kwargs) -> AgentResult:
"""
Override this method to define agent behavior.
Call `self.execute(query)` to run the agentic loop.
"""
pass
async def execute(self, user_query: str) -> AgentResult:
"""
Execute the agentic loop.
This is the core execution method called by both decorator and class APIs.
"""
return await _execute_agent_loop(
name=self.__class__.__name__,
system_prompt=self.system_prompt,
user_query=user_query,
provider=self.provider,
tools=self.tool_defs,
max_iterations=self.max_iterations,
registry=self.registry
)Internal Agent Execution Loop
async def _execute_agent_loop(
name: str,
system_prompt: str,
user_query: str,
provider: BaseLLMProvider,
tools: list[ToolDefinition],
max_iterations: int,
registry: ToolRegistry
) -> AgentResult:
"""
Core agentic execution loop (ReAct-style).
Flow:
1. Build messages (system + user)
2. Loop until max_iterations or final answer:
a. Call LLM with current messages + tools
b. Record AGENT_LLM_CALL and AGENT_LLM_RESPONSE events
c. If tool_calls: execute tools, append results, continue
d. If no tool_calls: return final answer
"""
ctx = get_context()
# Record AGENT_STARTED event
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_STARTED,
data={
"agent_name": name,
"system_prompt": system_prompt,
"user_query": user_query,
"max_iterations": max_iterations
}
))
# Initialize conversation
messages = [
Message(role=MessageRole.SYSTEM, content=system_prompt),
Message(role=MessageRole.USER, content=user_query)
]
total_tokens = {"input": 0, "output": 0}
tool_calls_made = 0
iteration = 0
try:
while iteration < max_iterations:
iteration += 1
# Call LLM
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_LLM_CALL,
data={"iteration": iteration, "messages_count": len(messages)}
))
response = await provider.generate(messages, tools=tools)
# Record response
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_LLM_RESPONSE,
data={
"iteration": iteration,
"content": response.content,
"tool_calls": [{"name": tc.name, "args": tc.arguments} for tc in response.tool_calls],
"finish_reason": response.finish_reason,
"usage": response.usage
}
))
# Update token count
total_tokens["input"] += response.usage.get("input_tokens", 0)
total_tokens["output"] += response.usage.get("output_tokens", 0)
# Add assistant message
messages.append(Message(role=MessageRole.ASSISTANT, content=response.content))
# If no tool calls, we're done
if not response.tool_calls:
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_COMPLETED,
data={
"iterations": iteration,
"tool_calls_made": tool_calls_made,
"total_tokens": total_tokens
}
))
return AgentResult(
content=response.content,
messages=messages,
tool_calls_made=tool_calls_made,
token_usage={**total_tokens, "total": sum(total_tokens.values())},
iterations=iteration,
finish_reason="stop"
)
# Execute tool calls
for tool_call in response.tool_calls:
tool_calls_made += 1
# Execute via registry (handles events internally)
result = await registry.execute(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
arguments=tool_call.arguments
)
# Add tool result to messages
result_content = str(result.result) if result.error is None else f"Error: {result.error}"
messages.append(Message(
role=MessageRole.TOOL,
content=result_content,
name=tool_call.name
))
# Max iterations reached
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_COMPLETED,
data={
"iterations": iteration,
"tool_calls_made": tool_calls_made,
"total_tokens": total_tokens,
"finish_reason": "max_iterations"
}
))
return AgentResult(
content=messages[-1].content if messages else "",
messages=messages,
tool_calls_made=tool_calls_made,
token_usage={**total_tokens, "total": sum(total_tokens.values())},
iterations=iteration,
finish_reason="max_iterations"
)
except Exception as e:
# Record error
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.AGENT_ERROR,
data={"error": str(e), "iteration": iteration}
))
return AgentResult(
content="",
messages=messages,
tool_calls_made=tool_calls_made,
token_usage={**total_tokens, "total": sum(total_tokens.values())},
iterations=iteration,
finish_reason="error",
error=str(e)
)Usage Examples
Decorator API
from pyworkflow import agent
from pyworkflow.agents import tool
@tool
async def web_search(query: str) -> str:
"""Search the web."""
# Implementation
return "Search results..."
@agent(model="claude-sonnet-4-5-20250929", tools=[web_search])
async def research_agent(query: str):
"""You are a helpful research assistant with web search."""
# Use in workflow
result = await research_agent("What is PyWorkflow?")
print(result.content)Class API
class ResearchAgent(Agent):
model = "claude-sonnet-4-5-20250929"
tools = [web_search, calculator]
system_prompt = "You are a helpful research assistant."
max_iterations = 20
async def run(self, query: str) -> AgentResult:
return await self.execute(query)
# Use in workflow
agent = ResearchAgent()
result = await agent.run("What is PyWorkflow?")
print(result.content)Integration with PyWorkflow
As Workflow Step
Agents integrate seamlessly as workflow steps:
from pyworkflow import workflow, step
@workflow(durable=True)
async def research_workflow(topic: str):
# Agent as step
research_result = await research_agent(topic)
# Regular step
summary = await summarize_step(research_result.content)
return summaryEvent Replay
On workflow replay:
AGENT_LLM_RESPONSEevents provide cached LLM responsesAGENT_TOOL_RESULTevents provide cached tool results- No API calls made, no costs incurred
- Deterministic execution guaranteed
Acceptance Criteria
-
AgentResulttype defined -
@agentdecorator (functional API) with system_prompt, tools, max_iterations -
Agentbase class (OOP API) withrun()abstract method - Internal
_execute_agent_loop()function (ReAct-style) - Event integration:
AGENT_STARTED,AGENT_LLM_CALL,AGENT_LLM_RESPONSE,AGENT_COMPLETED,AGENT_ERROR - System prompt extraction from docstring (decorator API)
- LLM provider integration (via
BaseLLMProvider) - Tool integration (via
ToolRegistry) - Max iterations safeguard
- Token usage tracking
- Error handling and
AGENT_ERRORevents - Unit tests for decorator, base class, and execution loop
- Integration test: agent with tools executing in workflow
- Replay test: verify cached responses used on replay
- Documentation with examples for both APIs
References
- Agent-Patterns GitHub
- Agent-Patterns Design Document
- How to Build Your First AI Agent in 2025: Step-by-Step with Python & LangGraph
- LangChain AI Agents: Complete Implementation Guide 2025
- OpenAI Agents SDK - Agents
- OpenAI Agents SDK - Examples
- Multi-Agent System Design Patterns From Scratch In Python
- Multi-Agent Collaboration Patterns with Strands Agents (AWS)
- Custom Python Decorator Patterns Worth Copy-Pasting Forever
- Mastering Decorators in Base and Derived Python Classes
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
agentsAI Agent module (pyworkflow_agents)AI Agent module (pyworkflow_agents)epicLarge initiative containing multiple related issuesLarge initiative containing multiple related issuesfeatureFeature to be implementedFeature to be implemented