Skip to content

Agent Event Types for Event Sourcing #175

@yasha-dev1

Description

@yasha-dev1

Overview

Agent Event Types define the event schema for recording AI agent execution in PyWorkflow's event-sourced architecture. This is the key differentiator of PyWorkflow Agents: every agent action (LLM call, tool execution, memory access) becomes an immutable event in the durable event log.

These events enable:

  • Deterministic Replay: Re-run agent workflows without API calls or costs
  • Full Audit Trail: Every decision, tool call, and LLM response is logged
  • Time-Travel Debugging: Inspect agent state at any point in execution
  • Crash Recovery: Resume agent workflows after worker failures
  • Cost Tracking: Measure token usage and API costs per workflow

Unlike traditional agent frameworks (LangGraph, CrewAI, AutoGen) that focus on runtime execution, PyWorkflow Agents treats agent execution as event-sourced state machines—inspired by Temporal, Azure Durable Functions, and modern observability platforms.

Architecture

Agent events extend PyWorkflow's existing event system (pyworkflow/engine/events.py) with agent-specific event types.

Agent Execution
    ↓
Event Log (append-only)
├── AGENT_STARTED
├── AGENT_LLM_CALL
├── AGENT_LLM_RESPONSE
├── AGENT_TOOL_CALL
├── AGENT_TOOL_RESULT
├── AGENT_TOOL_CALL
├── AGENT_TOOL_RESULT
├── AGENT_LLM_CALL
├── AGENT_LLM_RESPONSE
├── AGENT_RESPONSE
└── AGENT_COMPLETED
    ↓
Storage Backend (PostgreSQL, Redis, SQLite, File)
    ↓
Replay Engine
    ↓
Deterministic Re-execution (no API calls, zero cost)

Event Sourcing Flow

First Execution:

  1. Agent starts → record AGENT_STARTED
  2. Call LLM → record AGENT_LLM_CALL → receive response → record AGENT_LLM_RESPONSE
  3. LLM requests tool → record AGENT_TOOL_CALL → execute → record AGENT_TOOL_RESULT
  4. Repeat until final answer → record AGENT_RESPONSE → record AGENT_COMPLETED

Replay (after crash or for debugging):

  1. Load event log from storage
  2. Replay AGENT_LLM_RESPONSE events → use cached LLM responses (no API calls)
  3. Replay AGENT_TOOL_RESULT events → use cached tool results (no re-execution)
  4. Fast-forward to suspension point or completion
  5. Continue execution from last checkpoint

This pattern mirrors PyWorkflow's existing step replay mechanism but extends it to agentic loops.

Reference Implementations

Industry research in 2025 demonstrates the convergence of AI agents and event-driven observability:

Event-Driven AI Agent Frameworks

Graphite

AI Agent Observability Platforms (2025)

OpenTelemetry GenAI Semantic Conventions

Event Data Structure

  • Agent Observability: Can the Old Playbook Handle the New Game? (Greptime)
  • Agents capture events with high cardinality (trace_id, session_id) and high dimensionality (dozens of fields)
  • Event fields: timestamps, agent names, models, token counts, latency, prompts, responses, tool calls, reasoning, memory state
  • Events log detailed moments during model execution (user prompts, model responses) for granular view of agent interactions

Tracing Standards

Microsoft Agent Framework

Industry Resources

Proposed Implementation

Event Type Enum Extension

Add to pyworkflow/engine/events.py:

class EventType(str, Enum):
    # Existing workflow events
    WORKFLOW_STARTED = "workflow_started"
    WORKFLOW_COMPLETED = "workflow_completed"
    STEP_STARTED = "step_started"
    STEP_COMPLETED = "step_completed"
    SLEEP_STARTED = "sleep_started"
    SLEEP_COMPLETED = "sleep_completed"
    HOOK_CREATED = "hook_created"
    HOOK_RECEIVED = "hook_received"
    # ... existing events ...
    
    # Agent events (NEW)
    AGENT_STARTED = "agent_started"
    AGENT_LLM_CALL = "agent_llm_call"
    AGENT_LLM_RESPONSE = "agent_llm_response"
    AGENT_TOOL_CALL = "agent_tool_call"
    AGENT_TOOL_RESULT = "agent_tool_result"
    AGENT_RESPONSE = "agent_response"
    AGENT_COMPLETED = "agent_completed"
    AGENT_ERROR = "agent_error"
    AGENT_HANDOFF = "agent_handoff"  # For multi-agent (Phase 3)
    
    # Memory events (Phase 2)
    MEMORY_STORED = "memory_stored"
    MEMORY_RETRIEVED = "memory_retrieved"
    MEMORY_COMPACTED = "memory_compacted"

Event Schemas

AGENT_STARTED

{
    "run_id": "run_abc123",
    "type": "agent_started",
    "timestamp": "2026-02-14T10:30:00Z",
    "sequence": 42,
    "data": {
        "agent_name": "research_agent",
        "system_prompt": "You are a helpful research assistant.",
        "user_query": "What is PyWorkflow?",
        "model": "claude-sonnet-4-5-20250929",
        "tools": ["web_search", "calculator"],
        "max_iterations": 20,
        "temperature": 0.7
    }
}

AGENT_LLM_CALL

{
    "run_id": "run_abc123",
    "type": "agent_llm_call",
    "timestamp": "2026-02-14T10:30:01Z",
    "sequence": 43,
    "data": {
        "iteration": 1,
        "model": "claude-sonnet-4-5-20250929",
        "messages_count": 2,
        "messages": [
            {"role": "system", "content": "You are helpful."},
            {"role": "user", "content": "What is PyWorkflow?"}
        ],
        "tools": ["web_search"],
        "temperature": 0.7,
        "max_tokens": 4096
    }
}

AGENT_LLM_RESPONSE

{
    "run_id": "run_abc123",
    "type": "agent_llm_response",
    "timestamp": "2026-02-14T10:30:03Z",
    "sequence": 44,
    "data": {
        "iteration": 1,
        "model": "claude-sonnet-4-5-20250929",
        "content": "I'll search for information about PyWorkflow.",
        "tool_calls": [
            {
                "id": "call_123",
                "name": "web_search",
                "arguments": {"query": "PyWorkflow durable workflow orchestration"}
            }
        ],
        "finish_reason": "tool_calls",
        "usage": {
            "input_tokens": 150,
            "output_tokens": 45,
            "total_tokens": 195
        },
        "latency_ms": 1200
    }
}

AGENT_TOOL_CALL

{
    "run_id": "run_abc123",
    "type": "agent_tool_call",
    "timestamp": "2026-02-14T10:30:03Z",
    "sequence": 45,
    "data": {
        "tool_call_id": "call_123",
        "tool_name": "web_search",
        "arguments": {"query": "PyWorkflow durable workflow orchestration"},
        "iteration": 1
    }
}

AGENT_TOOL_RESULT

{
    "run_id": "run_abc123",
    "type": "agent_tool_result",
    "timestamp": "2026-02-14T10:30:05Z",
    "sequence": 46,
    "data": {
        "tool_call_id": "call_123",
        "tool_name": "web_search",
        "result": "PyWorkflow is a Python implementation of durable, event-sourced workflow orchestration...",
        "error": null,
        "latency_ms": 850,
        "iteration": 1
    }
}

AGENT_RESPONSE

{
    "run_id": "run_abc123",
    "type": "agent_response",
    "timestamp": "2026-02-14T10:30:08Z",
    "sequence": 47,
    "data": {
        "content": "PyWorkflow is a Python framework for durable workflow orchestration...",
        "iteration": 2,
        "is_final": true
    }
}

AGENT_COMPLETED

{
    "run_id": "run_abc123",
    "type": "agent_completed",
    "timestamp": "2026-02-14T10:30:08Z",
    "sequence": 48,
    "data": {
        "iterations": 2,
        "tool_calls_made": 1,
        "total_tokens": {
            "input": 300,
            "output": 120,
            "total": 420
        },
        "total_cost_usd": 0.0025,  # Calculated from token usage + pricing
        "finish_reason": "stop",
        "latency_ms": 8000
    }
}

AGENT_ERROR

{
    "run_id": "run_abc123",
    "type": "agent_error",
    "timestamp": "2026-02-14T10:30:08Z",
    "sequence": 49,
    "data": {
        "error": "RateLimitError: API rate limit exceeded",
        "error_type": "RateLimitError",
        "iteration": 3,
        "stack_trace": "Traceback (most recent call last)...",
        "recoverable": true,
        "retry_after": 60
    }
}

AGENT_HANDOFF (Phase 3: Multi-Agent)

{
    "run_id": "run_abc123",
    "type": "agent_handoff",
    "timestamp": "2026-02-14T10:30:10Z",
    "sequence": 50,
    "data": {
        "from_agent": "supervisor",
        "to_agent": "research_specialist",
        "handoff_reason": "Delegating research task to specialist",
        "context": {"query": "Deep technical question", "priority": "high"},
        "iteration": 5
    }
}

MEMORY_STORED (Phase 2: Memory)

{
    "run_id": "run_abc123",
    "type": "memory_stored",
    "timestamp": "2026-02-14T10:30:11Z",
    "sequence": 51,
    "data": {
        "memory_type": "episodic",  # conversation, persistent, summary, episodic
        "memory_id": "mem_xyz789",
        "content": "User asked about PyWorkflow, responded with overview",
        "metadata": {"importance": 0.8, "tags": ["technical", "documentation"]},
        "embedding_model": "text-embedding-3-small"
    }
}

MEMORY_RETRIEVED (Phase 2: Memory)

{
    "run_id": "run_abc123",
    "type": "memory_retrieved",
    "timestamp": "2026-02-14T10:30:12Z",
    "sequence": 52,
    "data": {
        "memory_type": "persistent",
        "query": "Previous conversations about PyWorkflow",
        "results_count": 3,
        "memories": [
            {"memory_id": "mem_abc", "content": "...", "relevance": 0.92},
            {"memory_id": "mem_def", "content": "...", "relevance": 0.85}
        ],
        "retrieval_method": "semantic_search"
    }
}

MEMORY_COMPACTED (Phase 2: Memory)

{
    "run_id": "run_abc123",
    "type": "memory_compacted",
    "timestamp": "2026-02-14T10:30:13Z",
    "sequence": 53,
    "data": {
        "original_messages_count": 20,
        "compacted_messages_count": 5,
        "compression_method": "llm_summary",
        "original_tokens": 5000,
        "compacted_tokens": 800,
        "summary": "Conversation summary: User learned about PyWorkflow features..."
    }
}

Event Replay Logic

Update pyworkflow/engine/replay.py to handle agent events:

class EventReplayer:
    async def _apply_event(self, ctx: WorkflowContext, event: Event):
        """Apply event to context during replay."""
        
        # Existing event handling
        if event.type == EventType.STEP_COMPLETED:
            ctx.step_results[event.data["step_id"]] = event.data["result"]
        
        # Agent event handling (NEW)
        elif event.type == EventType.AGENT_LLM_RESPONSE:
            # Cache LLM response for replay
            call_id = f"llm_call_{event.data['iteration']}"
            ctx.agent_llm_cache[call_id] = event.data
        
        elif event.type == EventType.AGENT_TOOL_RESULT:
            # Cache tool result for replay
            tool_call_id = event.data["tool_call_id"]
            ctx.agent_tool_cache[tool_call_id] = event.data
        
        elif event.type == EventType.MEMORY_RETRIEVED:
            # Cache memory retrieval
            memory_key = event.data.get("query", "")
            ctx.agent_memory_cache[memory_key] = event.data["memories"]
        
        # ... handle other agent events ...

Add to WorkflowContext:

@dataclass
class WorkflowContext:
    # ... existing fields ...
    
    # Agent-specific caches (NEW)
    agent_llm_cache: dict = field(default_factory=dict)
    agent_tool_cache: dict = field(default_factory=dict)
    agent_memory_cache: dict = field(default_factory=dict)

Integration with PyWorkflow

Token Usage Tracking

Agent events enable token usage tracking across workflows:

from pyworkflow import get_workflow_run

run = await storage.get_run(run_id)
events = await storage.get_events(run_id)

total_tokens = 0
total_cost = 0.0

for event in events:
    if event.type == EventType.AGENT_COMPLETED:
        total_tokens += event.data["total_tokens"]["total"]
        total_cost += event.data.get("total_cost_usd", 0.0)

print(f"Total tokens: {total_tokens}")
print(f"Total cost: ${total_cost:.4f}")

Time-Travel Debugging

Inspect agent state at any point:

# Replay up to event sequence 45
ctx = await replay_until_sequence(run_id, sequence=45)

# Inspect cached LLM responses
print(ctx.agent_llm_cache)

# Inspect cached tool results
print(ctx.agent_tool_cache)

Acceptance Criteria

  • Agent event types added to EventType enum
  • Event schemas documented for all agent events
  • AGENT_STARTED, AGENT_LLM_CALL, AGENT_LLM_RESPONSE events
  • AGENT_TOOL_CALL, AGENT_TOOL_RESULT events (integrate with ToolRegistry)
  • AGENT_RESPONSE, AGENT_COMPLETED, AGENT_ERROR events
  • AGENT_HANDOFF event (for Phase 3 multi-agent)
  • MEMORY_STORED, MEMORY_RETRIEVED, MEMORY_COMPACTED events (for Phase 2)
  • Event replay logic in EventReplayer._apply_event()
  • WorkflowContext extended with agent caches (agent_llm_cache, agent_tool_cache, agent_memory_cache)
  • Token usage tracking from events
  • Cost calculation from token usage (configurable pricing)
  • Unit tests for event creation and serialization
  • Integration test: event replay for agent workflow
  • Documentation: event schema reference

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAI Agent module (pyworkflow_agents)epicLarge initiative containing multiple related issuesfeatureFeature to be implemented

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions