-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Overview
Episodic memory captures specific past interactions as complete episodes including situation, approach, outcome, and why the approach worked. Unlike semantic memory (facts) or procedural memory (how-to), episodic memory preserves the full context of past experiences, enabling agents to learn from successful (and failed) patterns.
PyWorkflow Advantage: PyWorkflow's event sourcing architecture means every past workflow run is already a complete episode. The event log captures the entire interaction trace with full temporal and causal information.
How It Works
Episode Structure:
Episode = {
observation: "What was the situation?",
thoughts: "What did the agent consider?",
action: "What did the agent do?",
result: "What happened?",
success: true/false,
why_it_worked: "Key insight"
}
Capture Flow:
- Workflow executes → Events recorded in event log
- Workflow completes → Extract episode from event sequence
- Generate embedding of situation + outcome
- Store episode with semantic index
- Future similar situation → Retrieve relevant episodes
- Apply successful pattern from past
Data Flow:
Workflow Execution → Event Log [COMPLETE TRACE]
↓
Episode Extraction
↓
Embed(situation + outcome)
↓
Store in Episodic Memory DB
New Situation → Embed(current_situation)
→ Semantic Search (past episodes)
→ Retrieve Top-K Similar Episodes
→ Extract Successful Patterns
→ Apply to Current Situation
Reference Implementations
- LangChain LangMem Episodes - Episode captures situation, thoughts, action, result, and why it worked
- Episodic Memory in AI Agents (GeeksforGeeks) - Temporal sequence of past experiences
- MarkTechPost: Memory-Powered Agentic AI - Continuous learning through episodic experiences
- Memory-Driven AI Agents Tutorial (Feb 2026) - Complete implementation guide
- mem-agent: Equipping LLM Agents with Memory Using RL - Reinforcement learning for memory management
Proposed PyWorkflow Implementation
PyWorkflow is uniquely positioned for episodic memory because the event log IS the episodic trace:
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict, Any
import numpy as np
from pyworkflow import workflow, step, get_context
from pyworkflow.storage.base import StorageBackend
from pyworkflow.engine.events import Event, EventType
@dataclass
class Episode:
"""Single episodic memory from past workflow run"""
episode_id: str # = run_id of past workflow
# Core episode components
observation: str # Situation/context at start
thoughts: List[str] # Agent reasoning steps
actions: List[Dict[str, Any]] # Steps executed
result: Any # Outcome
# Metadata
success: bool # Did it achieve goal?
timestamp: datetime
duration_seconds: float
workflow_name: str
# Learning
why_it_worked: str # Key insight (LLM-generated)
embedding: np.ndarray # For semantic search
# Full trace (for deep inspection)
event_log: List[Event] # Complete event sequence
def similarity(self, query_embedding: np.ndarray) -> float:
"""Similarity to current situation"""
return np.dot(self.embedding, query_embedding) / (
np.linalg.norm(self.embedding) * np.linalg.norm(query_embedding)
)
class EpisodicMemory:
"""Episodic memory backed by PyWorkflow event logs"""
def __init__(self, storage: StorageBackend):
self.storage = storage
self.embedding_model = self._init_embedding_model()
async def create_episode_from_run(self, run_id: str) -> Episode:
"""Extract episode from completed workflow run"""
# Get workflow run metadata
run = await self.storage.get_run(run_id)
# Get complete event log
events = await self.storage.get_events(run_id)
# Extract episode components from events
observation = self._extract_observation(events)
thoughts = self._extract_thoughts(events) # From step reasoning
actions = self._extract_actions(events) # From step executions
result = run.result
# Use LLM to analyze why it worked (or failed)
analysis_prompt = f"""
Analyze this workflow execution and explain why it succeeded/failed:
Observation: {observation}
Actions: {actions}
Result: {result}
Success: {run.status == 'completed'}
Provide a concise insight about what made this approach work or fail.
"""
why_it_worked = await call_llm(analysis_prompt)
# Generate embedding of situation + outcome
embedding_text = f"{observation}\n{result}\n{why_it_worked}"
embedding = await self.embedding_model.embed(embedding_text)
episode = Episode(
episode_id=run_id,
observation=observation,
thoughts=thoughts,
actions=actions,
result=result,
success=(run.status == "completed"),
timestamp=run.created_at,
duration_seconds=(run.completed_at - run.created_at).total_seconds(),
workflow_name=run.workflow_name,
why_it_worked=why_it_worked,
embedding=embedding,
event_log=events # Full trace for inspection
)
# Store episode
await self._store_episode(episode)
return episode
def _extract_observation(self, events: List[Event]) -> str:
"""Extract initial situation from events"""
# Look for workflow_started event
started = next((e for e in events if e.type == EventType.WORKFLOW_STARTED), None)
if started:
return f"Workflow started with args: {started.data.get('args', {})}"
return "Unknown initial situation"
def _extract_thoughts(self, events: List[Event]) -> List[str]:
"""Extract agent reasoning from events"""
thoughts = []
for event in events:
# Look for AGENT_MESSAGE events with reasoning
if event.type == EventType.AGENT_MESSAGE:
if event.data.get("role") == "assistant":
thoughts.append(event.data.get("content", ""))
# Or custom AGENT_THOUGHT events
elif event.type == EventType.AGENT_THOUGHT:
thoughts.append(event.data.get("thought", ""))
return thoughts
def _extract_actions(self, events: List[Event]) -> List[Dict[str, Any]]:
"""Extract actions (steps) from events"""
actions = []
for event in events:
if event.type == EventType.STEP_COMPLETED:
actions.append({
"step_id": event.data.get("step_id"),
"step_name": event.data.get("step_name"),
"result": event.data.get("result")
})
return actions
async def _store_episode(self, episode: Episode):
"""Store episode in storage backend"""
# Store as special event linked to original run
await self.storage.record_event(Event(
run_id=episode.episode_id,
type=EventType.EPISODE_CREATED,
data={
"episode_id": episode.episode_id,
"observation": episode.observation,
"thoughts": episode.thoughts,
"actions": episode.actions,
"result": episode.result,
"success": episode.success,
"workflow_name": episode.workflow_name,
"why_it_worked": episode.why_it_worked,
"embedding": episode.embedding.tolist(),
"timestamp": episode.timestamp.isoformat(),
"duration_seconds": episode.duration_seconds
}
))
async def retrieve_similar_episodes(
self,
current_situation: str,
workflow_name: Optional[str] = None,
top_k: int = 3,
success_only: bool = True
) -> List[Episode]:
"""Find similar past episodes via semantic search"""
# Embed current situation
query_embedding = await self.embedding_model.embed(current_situation)
# Load past episodes
episodes = await self._load_episodes(
workflow_name=workflow_name,
success_only=success_only
)
# Calculate similarities
scored = [
(episode, episode.similarity(query_embedding))
for episode in episodes
]
# Sort by similarity and return top-k
sorted_episodes = sorted(scored, key=lambda x: x[1], reverse=True)
return [ep for ep, score in sorted_episodes[:top_k]]
async def _load_episodes(
self,
workflow_name: Optional[str] = None,
success_only: bool = True
) -> List[Episode]:
"""Load episodes from storage"""
# Query for EPISODE_CREATED events
events = await self.storage.get_events_by_type(EventType.EPISODE_CREATED)
episodes = []
for event in events:
data = event.data
# Filter by workflow name
if workflow_name and data.get("workflow_name") != workflow_name:
continue
# Filter by success
if success_only and not data.get("success"):
continue
episodes.append(Episode(
episode_id=data["episode_id"],
observation=data["observation"],
thoughts=data["thoughts"],
actions=data["actions"],
result=data["result"],
success=data["success"],
timestamp=datetime.fromisoformat(data["timestamp"]),
duration_seconds=data["duration_seconds"],
workflow_name=data["workflow_name"],
why_it_worked=data["why_it_worked"],
embedding=np.array(data["embedding"]),
event_log=[] # Not loaded by default (can load on-demand)
))
return episodes
@step()
async def learn_from_past_episodes(current_situation: str) -> str:
"""Retrieve and learn from similar past episodes"""
ctx = get_context()
episodic_memory = EpisodicMemory(storage=ctx.storage)
# Find similar past episodes
episodes = await episodic_memory.retrieve_similar_episodes(
current_situation=current_situation,
workflow_name=ctx.workflow_name,
top_k=3,
success_only=True
)
# Format lessons learned
lessons = ["Lessons from past similar situations:"]
for i, ep in enumerate(episodes, 1):
lessons.append(f"\n{i}. Episode from {ep.timestamp}")
lessons.append(f" Observation: {ep.observation}")
lessons.append(f" Actions: {[a['step_name'] for a in ep.actions]}")
lessons.append(f" Result: {ep.result}")
lessons.append(f" Key insight: {ep.why_it_worked}")
return "\n".join(lessons)
@workflow(durable=True)
async def agent_with_episodic_memory(situation: str):
"""Agent that learns from past experiences"""
# Retrieve lessons from similar past runs
past_lessons = await learn_from_past_episodes(situation)
# Generate response informed by past episodes
prompt = f"""
Current situation: {situation}
{past_lessons}
Based on these past experiences, how should we approach this situation?
"""
response = await generate_response(prompt)
return response
# Automatic episode creation on workflow completion
@workflow.on_complete
async def create_episode(run_id: str):
"""Automatically create episode from completed workflow"""
ctx = get_context()
episodic_memory = EpisodicMemory(storage=ctx.storage)
episode = await episodic_memory.create_episode_from_run(run_id)
# Log episode creation
print(f"Created episode {episode.episode_id}: {episode.why_it_worked}")Integration with Event Sourcing
Episodic memory is PyWorkflow's killer feature because the event log already captures everything:
New Event Type:
class EventType(str, Enum):
# ... existing types
EPISODE_CREATED = "episode_created" # NEW
AGENT_THOUGHT = "agent_thought" # NEW (optional, for thoughts)Unique Advantages:
- Zero-Cost Episode Capture: Event log already has complete trace
- Perfect Replay: Can re-execute any past episode deterministically
- Ground Truth: Events are facts, not summaries
- Temporal Ordering: Sequence preserved automatically
- Causal Links: Can trace cause-effect through events
Episode = Past Workflow Run:
Every workflow run in PyWorkflow storage is already a complete episode:
run_id= episode_idevents= complete interaction traceresult= outcomestatus= success/failure
Query Past Episodes:
# Get all successful past runs of a workflow
past_runs = await storage.query_runs(
workflow_name="customer_support_workflow",
status="completed",
limit=100
)
# Each run is an episode
for run in past_runs:
episode = await episodic_memory.create_episode_from_run(run.run_id)Trade-offs
Pros:
- PyWorkflow native: Event log already captures episodes
- Learn from successes AND failures
- Full context preserved (not just summaries)
- Can replay episodes for debugging/analysis
- Enables continuous improvement
Cons:
- Episode extraction requires LLM call (cost)
- Embedding generation per episode (overhead)
- Storage grows with episodes (mitigate with pruning)
- Similarity search on large episode DB can be slow
When to Use:
- Workflows with repeated similar tasks (support tickets, orders)
- Learning optimal strategies from past attempts
- Debugging why certain approaches work/fail
- Continuous improvement systems
When to Avoid:
- Unique one-off tasks (no similar episodes)
- Privacy-sensitive scenarios (episodes contain full history)
- Storage-constrained environments
Related Issues
- Short-term / Conversation Memory for pyworkflow_agents #155 - Short-term / Conversation Memory - for current session
- Long-term / Persistent Memory (Cross-Session) for pyworkflow_agents #162 - Long-term / Persistent Memory - semantic facts (vs. episodic experiences)
- Summary Memory (Compressed Context) for pyworkflow_agents #159 - Summary Memory - compressing episodes (trade-off: detail vs. size)