Skip to content

Tool Framework (@tool decorator + ToolRegistry) #166

@yasha-dev1

Description

@yasha-dev1

Overview

The Tool Framework enables defining, registering, and executing agent tools (functions the LLM can call). It consists of:

  1. @tool decorator: Converts Python functions to LLM-callable tools with automatic schema generation
  2. ToolRegistry: Central registry for tool discovery and execution
  3. Event integration: Tool calls recorded as events for replay and auditability

This design follows proven patterns from LangChain, Pydantic AI, and the Claude Agent SDK while adding PyWorkflow's unique event-sourcing capabilities.

Architecture

The tool framework sits between agent logic and business functions, translating LLM tool requests into Python function calls.

Agent Execution Loop
    ↓
LLM Response (tool_calls: [{name: "search", args: {...}}])
    ↓
ToolRegistry.execute(name="search", **args)
    ↓
Record AGENT_TOOL_CALL event
    ↓
Execute registered Python function
    ↓
Record AGENT_TOOL_RESULT event
    ↓
Return result to agent loop

Event Sourcing Integration

Key Innovation: Tool execution is event-sourced, enabling deterministic replay.

On first execution:

  1. Record AGENT_TOOL_CALL event (tool name, arguments, timestamp)
  2. Execute Python function
  3. Record AGENT_TOOL_RESULT event (result, latency, error if any)

On replay:

  1. Detect existing AGENT_TOOL_RESULT event
  2. Return cached result without re-executing function
  3. Skip side effects (API calls, database writes, etc.)

This ensures agent workflows are deterministic and replayable—critical for debugging and crash recovery.

Reference Implementations

Leading AI frameworks in 2025 demonstrate proven tool patterns:

LangChain BaseTool/StructuredTool

Pydantic AI

  • Pydantic AI
  • Pydantic AI GitHub
  • @agent.tool decorator registers functions LLM may call while responding
  • Uses Pydantic models for type-safe tool parameters

Microsoft Agent Framework

Agent-Tooling Library

  • agent-tooling PyPI
  • Decorator-based tool registration with metadata capture and schema generation

Industry Resources

Proposed Implementation

Core Types

from typing import Callable, Any, Optional
from dataclasses import dataclass
from pydantic import BaseModel, Field
import inspect
import json

@dataclass
class ToolDefinition:
    """LLM tool definition (JSON Schema format)."""
    name: str
    description: str
    parameters: dict  # JSON Schema
    
    def to_dict(self) -> dict:
        """Convert to OpenAI/Anthropic tool format."""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters
        }

@dataclass
class ToolResult:
    """Result from tool execution."""
    tool_call_id: str
    name: str
    result: Any
    error: Optional[str] = None
    latency_ms: Optional[float] = None

@tool Decorator

from functools import wraps

def tool(
    name: Optional[str] = None,
    description: Optional[str] = None,
    registry: Optional["ToolRegistry"] = None
):
    """
    Decorator to convert a function into an LLM-callable tool.
    
    Args:
        name: Tool name (defaults to function name)
        description: Tool description (defaults to docstring)
        registry: ToolRegistry to register with (defaults to global registry)
    
    Example:
        @tool
        async def web_search(query: str) -> str:
            '''Search the web for information.'''
            async with httpx.AsyncClient() as client:
                response = await client.get(f"https://api.search.com?q={query}")
                return response.text
    """
    def decorator(func: Callable):
        # Extract metadata
        tool_name = name or func.__name__
        tool_description = description or (inspect.getdoc(func) or "No description")
        
        # Generate JSON Schema from type hints
        sig = inspect.signature(func)
        schema = _generate_schema_from_signature(sig)
        
        # Create tool definition
        tool_def = ToolDefinition(
            name=tool_name,
            description=tool_description,
            parameters=schema
        )
        
        # Register tool
        reg = registry or get_global_registry()
        reg.register(tool_name, func, tool_def)
        
        # Wrap function to add metadata
        @wraps(func)
        async def wrapper(*args, **kwargs):
            return await func(*args, **kwargs)
        
        wrapper._is_tool = True
        wrapper._tool_definition = tool_def
        
        return wrapper
    
    return decorator

def _generate_schema_from_signature(sig: inspect.Signature) -> dict:
    """
    Generate JSON Schema from function signature.
    
    Uses type hints to create schema compatible with OpenAI/Anthropic.
    """
    properties = {}
    required = []
    
    for param_name, param in sig.parameters.items():
        if param_name == "self":
            continue
        
        # Extract type
        param_type = param.annotation
        if param_type == inspect.Parameter.empty:
            param_type = str  # Default to string
        
        # Convert Python type to JSON Schema type
        schema_type = _python_type_to_json_schema(param_type)
        properties[param_name] = schema_type
        
        # Check if required
        if param.default == inspect.Parameter.empty:
            required.append(param_name)
    
    return {
        "type": "object",
        "properties": properties,
        "required": required
    }

def _python_type_to_json_schema(py_type) -> dict:
    """Convert Python type hint to JSON Schema."""
    if py_type == str:
        return {"type": "string"}
    elif py_type == int:
        return {"type": "integer"}
    elif py_type == float:
        return {"type": "number"}
    elif py_type == bool:
        return {"type": "boolean"}
    elif hasattr(py_type, "__origin__"):  # Generic types (List, Dict, etc.)
        origin = py_type.__origin__
        if origin == list:
            return {"type": "array"}
        elif origin == dict:
            return {"type": "object"}
    
    # Default to string
    return {"type": "string"}

ToolRegistry

from typing import Dict, Callable
from pyworkflow.context import get_context
from pyworkflow.engine.events import Event, EventType
import time

class ToolRegistry:
    """Registry for managing agent tools."""
    
    def __init__(self):
        self._tools: Dict[str, Callable] = {}
        self._definitions: Dict[str, ToolDefinition] = {}
    
    def register(self, name: str, func: Callable, definition: ToolDefinition):
        """Register a tool function."""
        self._tools[name] = func
        self._definitions[name] = definition
    
    def get_tool_definitions(self) -> list[ToolDefinition]:
        """Get all registered tool definitions (for LLM)."""
        return list(self._definitions.values())
    
    def get_tool_definitions_dict(self) -> list[dict]:
        """Get tool definitions as dicts (OpenAI/Anthropic format)."""
        return [d.to_dict() for d in self._definitions.values()]
    
    async def execute(
        self,
        tool_call_id: str,
        tool_name: str,
        arguments: dict
    ) -> ToolResult:
        """
        Execute a tool call.
        
        Records AGENT_TOOL_CALL and AGENT_TOOL_RESULT events for replay.
        """
        ctx = get_context()
        
        # Check if tool exists
        if tool_name not in self._tools:
            return ToolResult(
                tool_call_id=tool_call_id,
                name=tool_name,
                result=None,
                error=f"Tool '{tool_name}' not found"
            )
        
        # Check if result cached (replay mode)
        cached_result = self._get_cached_result(ctx, tool_call_id)
        if cached_result is not None:
            return cached_result
        
        # Record tool call event
        await ctx.storage.record_event(Event(
            run_id=ctx.run_id,
            type=EventType.AGENT_TOOL_CALL,
            data={
                "tool_call_id": tool_call_id,
                "tool_name": tool_name,
                "arguments": arguments
            }
        ))
        
        # Execute tool
        start_time = time.time()
        try:
            func = self._tools[tool_name]
            
            # Call function (handle both sync and async)
            if inspect.iscoroutinefunction(func):
                result = await func(**arguments)
            else:
                result = func(**arguments)
            
            latency_ms = (time.time() - start_time) * 1000
            
            # Record success event
            await ctx.storage.record_event(Event(
                run_id=ctx.run_id,
                type=EventType.AGENT_TOOL_RESULT,
                data={
                    "tool_call_id": tool_call_id,
                    "tool_name": tool_name,
                    "result": result,
                    "latency_ms": latency_ms,
                    "error": None
                }
            ))
            
            return ToolResult(
                tool_call_id=tool_call_id,
                name=tool_name,
                result=result,
                latency_ms=latency_ms
            )
        
        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            error_msg = str(e)
            
            # Record error event
            await ctx.storage.record_event(Event(
                run_id=ctx.run_id,
                type=EventType.AGENT_TOOL_RESULT,
                data={
                    "tool_call_id": tool_call_id,
                    "tool_name": tool_name,
                    "result": None,
                    "latency_ms": latency_ms,
                    "error": error_msg
                }
            ))
            
            return ToolResult(
                tool_call_id=tool_call_id,
                name=tool_name,
                result=None,
                error=error_msg,
                latency_ms=latency_ms
            )
    
    def _get_cached_result(self, ctx, tool_call_id: str) -> Optional[ToolResult]:
        """Check if tool result is cached in events (replay mode)."""
        for event in ctx.events:
            if (event.type == EventType.AGENT_TOOL_RESULT and
                event.data.get("tool_call_id") == tool_call_id):
                
                return ToolResult(
                    tool_call_id=tool_call_id,
                    name=event.data["tool_name"],
                    result=event.data.get("result"),
                    error=event.data.get("error"),
                    latency_ms=event.data.get("latency_ms")
                )
        
        return None

# Global registry instance
_global_registry = None

def get_global_registry() -> ToolRegistry:
    """Get the global tool registry."""
    global _global_registry
    if _global_registry is None:
        _global_registry = ToolRegistry()
    return _global_registry

Usage Examples

Basic Tool

from pyworkflow.agents import tool

@tool
async def web_search(query: str) -> str:
    """Search the web for information."""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.search.com?q={query}")
        return response.text

@tool
def calculator(expression: str) -> float:
    """Evaluate a mathematical expression."""
    # Safely evaluate (use a proper parser in production)
    return eval(expression)

Tool with Custom Metadata

@tool(
    name="get_weather",
    description="Get current weather for a location"
)
async def fetch_weather(location: str, units: str = "celsius") -> dict:
    """Fetch weather data from API."""
    # Implementation
    return {"temp": 22, "conditions": "sunny"}

Using Tools in Agent

from pyworkflow import agent

@agent(tools=[web_search, calculator])
async def research_agent(query: str):
    """Research assistant with web search and calculation."""
    pass

Integration with PyWorkflow

Event Types

Two new event types added to pyworkflow/engine/events.py:

class EventType(str, Enum):
    # ... existing events ...
    AGENT_TOOL_CALL = "agent_tool_call"      # Tool invocation requested
    AGENT_TOOL_RESULT = "agent_tool_result"  # Tool execution completed

Replay Behavior

During workflow replay:

  1. AGENT_TOOL_CALL event encountered
  2. Registry checks for matching AGENT_TOOL_RESULT event
  3. If found, return cached result (skip execution)
  4. If not found, execute tool and record result

This ensures deterministic replay without re-executing side effects.

Acceptance Criteria

  • ToolDefinition and ToolResult types defined
  • @tool decorator with automatic schema generation from type hints
  • ToolRegistry class with register/execute methods
  • Global registry instance (get_global_registry())
  • Event integration: AGENT_TOOL_CALL and AGENT_TOOL_RESULT events
  • Replay-safe execution (cached results from events)
  • Support for both sync and async tools
  • Error handling and error events
  • Unit tests for decorator, registry, and event replay
  • Integration test: tool execution + replay
  • Documentation with examples
  • JSON Schema generation from Python type hints (str, int, float, bool, List, Dict)

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAI Agent module (pyworkflow_agents)epicLarge initiative containing multiple related issuesfeatureFeature to be implemented

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions