-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Overview
The Dual-Layer Context Strategy is the 2026 production standard for agent context management. It addresses the fundamental misconception that large context windows (200K-400K tokens) have "solved" memory. Instead of injecting full history into every API call, this architecture splits memory into:
- Hot Path: Recent messages + summarized state (in context window, fast)
- Cold Path: External stores for historical retrieval (vector DB, storage, on-demand)
This pattern achieves sustainable cost and latency while maintaining long-term knowledge.
How It Works
Two-Layer Architecture:
┌─────────────────────────────────────────┐
│ HOT PATH (Context Window) │
│ - Last N messages (uncompressed) │
│ - Current workflow state │
│ - Active variables/context │
│ - Compressed summaries of older msgs │
│ Size: ~50K tokens (fits in window) │
└─────────────────────────────────────────┘
↑
│ Fast access (in-memory)
│
┌─────────────────────────────────────────┐
│ COLD PATH (External Storage) │
│ - Full conversation history │
│ - Past workflow runs (episodic) │
│ - Semantic knowledge base │
│ - User preferences │
│ Size: Unlimited (storage backend) │
└─────────────────────────────────────────┘
↑
│ Semantic retrieval (on-demand)
Data Flow:
- Query arrives → Check hot path first (recent context)
- If insufficient context → Semantic search in cold path
- Retrieve relevant memories → Inject into hot path
- Generate response → Using hot path context
- After turn → Memory Node decides what to save to cold path
- Hot path overflow → Compress oldest and move to cold path
Key Decision: What goes where?
| Memory Type | Hot Path | Cold Path |
|---|---|---|
| Last 10 messages | Yes | Yes (archived) |
| Current workflow state | Yes | Yes (checkpointed) |
| Summary of last hour | Yes | No |
| Full history (past days) | No | Yes |
| Semantic knowledge | No | Yes (indexed) |
| User preferences | Retrieved on-demand | Yes |
Reference Implementations
- Memory for AI Agents: Context Engineering (The New Stack) - 2026 production standard: hot/cold path
- AI Agent Memory Systems Guide (Digital Applied) - Dual-layer memory architecture patterns
- Mem0: Production-Ready Long-Term Memory (arXiv) - Hot path + Zep/Mem0/Pinecone cold path
- AI Memory Layer Guide (Mem0, Dec 2025) - Memory Node synthesis pattern
- ICLR 2026 Workshop: MemAgents - Memory for LLM-based agentic systems
2026 Production Stack:
- Hot Path: LangGraph checkpointing with PostgresSaver (replaces Redis)
- Cold Path (Preferences): Mem0 for user personalization
- Cold Path (Knowledge): Zep for Temporal Knowledge Graphs
- Cold Path (Episodes): Vector DB (Pinecone, Weaviate) for semantic search
Proposed PyWorkflow Implementation
PyWorkflow is perfectly positioned for dual-layer architecture:
- Hot Path: Current workflow context (from event replay)
- Cold Path: PyWorkflow storage backends + past runs
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
import numpy as np
from pyworkflow import workflow, step, get_context
from pyworkflow.storage.base import StorageBackend
from pyworkflow.engine.events import Event, EventType
@dataclass
class HotPathContext:
"""In-memory context for current workflow (context window)"""
recent_messages: List[Message] = field(default_factory=list)
compressed_summary: Optional[str] = None
current_state: Dict[str, Any] = field(default_factory=dict)
retrieved_memories: List[str] = field(default_factory=list)
max_messages: int = 20 # Keep last N messages uncompressed
max_tokens: int = 50000 # Context window budget
def get_context_for_llm(self) -> str:
"""Format hot path context for LLM"""
parts = []
# Add compressed summary (if exists)
if self.compressed_summary:
parts.append(f"[Earlier conversation summary]\n{self.compressed_summary}\n")
# Add retrieved cold path memories
if self.retrieved_memories:
parts.append("[Relevant past knowledge]")
for memory in self.retrieved_memories:
parts.append(f"- {memory}")
parts.append("")
# Add recent messages (uncompressed)
parts.append("[Recent conversation]")
for msg in self.recent_messages[-self.max_messages:]:
parts.append(f"{msg.role}: {msg.content}")
return "\n".join(parts)
def add_message(self, message: Message):
"""Add message to hot path"""
self.recent_messages.append(message)
# Trim if exceeding max_messages
if len(self.recent_messages) > self.max_messages * 2:
# Compress oldest half
to_compress = self.recent_messages[:self.max_messages]
self.recent_messages = self.recent_messages[self.max_messages:]
return to_compress # Signal: move to cold path
return None
@dataclass
class ColdPathStorage:
"""External storage for full history (unlimited)"""
storage: StorageBackend
namespace: str
async def store_conversation_archive(self, messages: List[Message]):
"""Archive old messages to cold path"""
ctx = get_context()
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.CONVERSATION_ARCHIVED,
data={
"messages": [
{
"role": msg.role,
"content": msg.content,
"timestamp": msg.timestamp.isoformat()
}
for msg in messages
],
"namespace": self.namespace
}
))
async def retrieve_relevant_context(
self,
query: str,
top_k: int = 5
) -> List[str]:
"""Semantic search in cold path"""
# Embed query
query_embedding = await self._embed(query)
# Search past workflow runs (episodic memory)
past_runs = await self.storage.query_runs(
workflow_name=get_context().workflow_name,
status="completed",
limit=100
)
# Search long-term semantic memory
memories = await self._search_long_term_memory(
query_embedding,
top_k=top_k
)
return memories
async def _search_long_term_memory(
self,
query_embedding: np.ndarray,
top_k: int
) -> List[str]:
"""Search MEMORY_STORED events via vector similarity"""
# Get all memory events for namespace
memory_events = await self.storage.get_events_by_type(
EventType.MEMORY_STORED
)
# Filter by namespace and compute similarities
scored = []
for event in memory_events:
if event.data.get("namespace") != self.namespace:
continue
embedding = np.array(event.data["embedding"])
similarity = np.dot(query_embedding, embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(embedding)
)
scored.append((event.data["content"], similarity))
# Sort and return top-k
sorted_memories = sorted(scored, key=lambda x: x[1], reverse=True)
return [content for content, score in sorted_memories[:top_k]]
class DualLayerContext:
"""Dual-layer context management (hot + cold)"""
def __init__(self, storage: StorageBackend, namespace: str):
self.hot_path = HotPathContext()
self.cold_path = ColdPathStorage(storage=storage, namespace=namespace)
async def add_message(self, message: Message):
"""Add message to hot path, archive to cold if needed"""
# Add to hot path
to_archive = self.hot_path.add_message(message)
# If hot path overflowed, archive to cold path
if to_archive:
await self.cold_path.store_conversation_archive(to_archive)
async def get_context_for_llm(self, current_query: str) -> str:
"""Get context from both hot and cold paths"""
# Retrieve relevant memories from cold path
cold_memories = await self.cold_path.retrieve_relevant_context(
query=current_query,
top_k=5
)
# Inject cold memories into hot path (temporary)
self.hot_path.retrieved_memories = cold_memories
# Return hot path context (includes cold memories)
return self.hot_path.get_context_for_llm()
async def save_to_cold_path(self, memory_type: str, content: str):
"""Memory Node: Decide what to save to cold path"""
# This is called after each conversational turn
# LLM decides what's worth saving long-term
ctx = get_context()
# Generate embedding
embedding = await self._embed(content)
# Store in cold path as MEMORY_STORED event
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.MEMORY_STORED,
data={
"content": content,
"memory_type": memory_type,
"namespace": self.cold_path.namespace,
"embedding": embedding.tolist()
}
))
@step()
async def memory_node_synthesis(conversation: List[Message], namespace: str):
"""Memory Node: Decide what to save to cold path after each turn"""
ctx = get_context()
dual_layer = DualLayerContext(storage=ctx.storage, namespace=namespace)
# Use LLM to extract memorable facts
extraction_prompt = f"""
From this conversation, extract facts worth remembering long-term:
{format_messages(conversation[-5:])} # Last 5 messages
Return important facts, preferences, or learnings (one per line).
Return empty if nothing memorable.
"""
facts = await call_llm(extraction_prompt)
# Save each fact to cold path
for fact in facts.split("\n"):
if fact.strip():
await dual_layer.save_to_cold_path(
memory_type="semantic",
content=fact.strip()
)
@workflow(durable=True)
async def agent_with_dual_layer_context(user_id: str, messages: List[str]):
"""Agent with hot/cold path context management"""
namespace = f"user:{user_id}"
ctx = get_context()
dual_layer = DualLayerContext(storage=ctx.storage, namespace=namespace)
responses = []
for user_input in messages:
# Add user message to hot path
user_msg = Message(role="user", content=user_input)
await dual_layer.add_message(user_msg)
# Get context from hot + cold paths
llm_context = await dual_layer.get_context_for_llm(user_input)
# Generate response
response = await call_llm(llm_context)
# Add assistant response to hot path
assistant_msg = Message(role="assistant", content=response)
await dual_layer.add_message(assistant_msg)
# Memory Node: Extract and save to cold path
await memory_node_synthesis(
conversation=[user_msg, assistant_msg],
namespace=namespace
)
responses.append(response)
return responsesIntegration with Event Sourcing
Dual-layer strategy maps naturally to PyWorkflow architecture:
Hot Path = Current Workflow Context:
- Restored from event replay on workflow resumption
- In-memory, fast access
- Bounded by context window
Cold Path = PyWorkflow Storage Backends:
- Full event log (complete history)
- Past workflow runs (episodic memory)
- MEMORY_STORED events (semantic knowledge)
- CONVERSATION_ARCHIVED events (old messages)
New Event Types:
class EventType(str, Enum):
# ... existing types
CONVERSATION_ARCHIVED = "conversation_archived" # NEW
MEMORY_STORED = "memory_stored" # From Issue #162Automatic Hot/Cold Routing:
PyWorkflow's event replay automatically populates hot path:
# On workflow resumption (after suspension/crash)
async def replay_events(ctx, events):
for event in events:
if event.type == EventType.AGENT_MESSAGE:
# Restore hot path conversation
ctx.dual_layer.hot_path.add_message(Message(...))
elif event.type == EventType.CONVERSATION_ARCHIVED:
# These are in cold path, don't load to hot
pass
elif event.type == EventType.MEMORY_STORED:
# Available for cold path retrieval
passKey Advantage: PyWorkflow's durable execution means hot path is always recoverable from events, while cold path provides unlimited historical context.
Trade-offs
Pros:
- Sustainable cost: Don't inject full history every call
- Lower latency: Smaller context = faster LLM inference
- Unlimited history: Cold path scales indefinitely
- Semantic retrieval: Find relevant past context, not just recent
- Production-ready: 2026 industry standard pattern
Cons:
- Retrieval overhead: Cold path lookup adds latency
- Retrieval quality: Depends on embedding model and indexing
- Complexity: Two systems to manage (hot + cold)
- Context window still needed: Hot path requires decent size (50K+)
When to Use:
- Always, in production 2026+ (this is the standard)
- Long-running multi-session agents
- Cost-sensitive applications (avoid dumping full history)
- Agents with large knowledge bases
When to Avoid:
- Extremely short conversations (overhead not worth it)
- Real-time latency critical (cold path retrieval adds ~100-500ms)
Related Issues
- Short-term / Conversation Memory for pyworkflow_agents #155 - Short-term / Conversation Memory - hot path recent messages
- Summary Memory (Compressed Context) for pyworkflow_agents #159 - Summary Memory - hot path compressed summaries
- Long-term / Persistent Memory (Cross-Session) for pyworkflow_agents #162 - Long-term / Persistent Memory - cold path semantic knowledge
- Episodic Memory (Past Experiences) for pyworkflow_agents #167 - Episodic Memory - cold path past experiences