-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Overview
Long-term persistent memory enables agents to remember facts, learnings, and preferences across multiple workflow sessions. Unlike conversation memory (session-scoped), long-term memory persists indefinitely and is retrievable via semantic search, enabling cumulative learning and personalization.
How It Works
Storage & Retrieval Architecture:
-
Write Path:
- Extract memorable facts from conversation
- Generate embeddings using embedding model
- Store in vector database + structured storage
- Associate with user/project namespace
-
Read Path:
- Embed current query/context
- Semantic search in vector database
- Retrieve top-k relevant memories
- Inject into LLM context
Data Flow:
Conversation → Memory Extraction (LLM)
→ Generate Embeddings
→ Store: Vector DB + Metadata DB
Query → Embed Query
→ Semantic Search (Vector DB)
→ Retrieve Top-K Memories
→ Inject into Context Window
Memory Types:
- Episodic: Specific past interactions (see Issue #[Issue 4])
- Semantic: Factual knowledge independent of context
- Procedural: How to perform tasks, successful patterns
Reference Implementations
- LangChain LangMem SDK - Long-term memory framework with semantic search
- Mem0 Open Source - Hybrid vector + graph memory, hierarchical (user/session/agent levels)
- Memoria Framework - Knowledge graph triplets in SQL + vector embeddings
- Redis AI Agent Memory - Production patterns with Redis for session state
- ChromaDB for Agent Memory - Open-source vector DB, persistent local storage
Proposed PyWorkflow Implementation
PyWorkflow already has multiple storage backends (PostgreSQL, Redis, SQLite, File). We can reuse these for long-term memory storage!
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict, Any
import numpy as np
from pyworkflow import workflow, step, get_context
from pyworkflow.storage.base import StorageBackend
from pyworkflow.engine.events import Event, EventType
@dataclass
class Memory:
"""Single long-term memory"""
memory_id: str
content: str
embedding: np.ndarray
memory_type: str # "episodic", "semantic", "procedural"
timestamp: datetime
namespace: str # user_id, project_id, etc.
metadata: Dict[str, Any] = field(default_factory=dict)
importance: float = 0.5 # 0-1, for filtering
def similarity(self, query_embedding: np.ndarray) -> float:
"""Cosine similarity to query"""
return np.dot(self.embedding, query_embedding) / (
np.linalg.norm(self.embedding) * np.linalg.norm(query_embedding)
)
class LongTermMemory:
"""Long-term memory backed by PyWorkflow storage"""
def __init__(self, storage: StorageBackend, namespace: str):
self.storage = storage
self.namespace = namespace
self.embedding_model = self._init_embedding_model()
async def store(self, content: str, memory_type: str, metadata: dict = None):
"""Store a memory with semantic embedding"""
# Generate embedding
embedding = await self.embedding_model.embed(content)
memory = Memory(
memory_id=self._generate_id(),
content=content,
embedding=embedding,
memory_type=memory_type,
timestamp=datetime.utcnow(),
namespace=self.namespace,
metadata=metadata or {}
)
# Store in PyWorkflow storage as special event
ctx = get_context()
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.MEMORY_STORED,
data={
"memory_id": memory.memory_id,
"content": memory.content,
"embedding": memory.embedding.tolist(),
"memory_type": memory.memory_type,
"namespace": memory.namespace,
"metadata": memory.metadata
}
))
return memory.memory_id
async def retrieve(
self,
query: str,
top_k: int = 5,
memory_type: Optional[str] = None,
min_similarity: float = 0.7
) -> List[Memory]:
"""Retrieve relevant memories via semantic search"""
# Embed query
query_embedding = await self.embedding_model.embed(query)
# Get all memories for namespace from storage
# In production: use vector DB index for efficiency
memories = await self._load_memories_from_storage(
namespace=self.namespace,
memory_type=memory_type
)
# Calculate similarities
scored_memories = [
(memory, memory.similarity(query_embedding))
for memory in memories
]
# Filter by similarity threshold
filtered = [
(mem, score)
for mem, score in scored_memories
if score >= min_similarity
]
# Sort by similarity and return top-k
sorted_memories = sorted(filtered, key=lambda x: x[1], reverse=True)
return [mem for mem, score in sorted_memories[:top_k]]
async def _load_memories_from_storage(
self,
namespace: str,
memory_type: Optional[str] = None
) -> List[Memory]:
"""Load memories from PyWorkflow storage backend"""
# Query storage for MEMORY_STORED events
# This could be optimized with storage-specific indexes
if isinstance(self.storage, PostgresStorage):
# Use SQL query for efficient filtering
query = """
SELECT data FROM events
WHERE type = 'MEMORY_STORED'
AND data->>'namespace' = %s
"""
params = [namespace]
if memory_type:
query += " AND data->>'memory_type' = %s"
params.append(memory_type)
results = await self.storage.execute_query(query, params)
elif isinstance(self.storage, RedisStorage):
# Use Redis search/scan
pattern = f"memory:{namespace}:*"
results = await self.storage.scan_pattern(pattern)
else:
# Fallback: scan all events (inefficient, for SQLite/File)
all_events = await self.storage.get_events_by_type(EventType.MEMORY_STORED)
results = [
e.data for e in all_events
if e.data.get("namespace") == namespace
]
# Convert to Memory objects
memories = [
Memory(
memory_id=data["memory_id"],
content=data["content"],
embedding=np.array(data["embedding"]),
memory_type=data["memory_type"],
timestamp=data.get("timestamp", datetime.utcnow()),
namespace=data["namespace"],
metadata=data.get("metadata", {})
)
for data in results
]
return memories
@step()
async def extract_and_store_memories(conversation: List[Message], namespace: str):
"""Extract memorable facts from conversation and store them"""
ltm = LongTermMemory(storage=get_context().storage, namespace=namespace)
# Use LLM to extract memorable facts
extraction_prompt = f"""
Extract important facts, learnings, or preferences from this conversation:
{format_messages(conversation)}
Return each fact as a separate line.
"""
facts = await call_llm(extraction_prompt)
# Store each fact
memory_ids = []
for fact in facts.split("\n"):
if fact.strip():
memory_id = await ltm.store(
content=fact.strip(),
memory_type="semantic",
metadata={"source": "conversation_extraction"}
)
memory_ids.append(memory_id)
return memory_ids
@step()
async def retrieve_relevant_context(query: str, namespace: str) -> str:
"""Retrieve relevant long-term memories for current query"""
ltm = LongTermMemory(storage=get_context().storage, namespace=namespace)
memories = await ltm.retrieve(query, top_k=5, min_similarity=0.7)
# Format for LLM context
context_parts = ["Relevant past knowledge:"]
for mem in memories:
context_parts.append(f"- {mem.content}")
return "\n".join(context_parts)
@workflow(durable=True)
async def agent_with_long_term_memory(user_id: str, query: str):
"""Agent that remembers across sessions"""
namespace = f"user:{user_id}"
# Retrieve relevant memories from past sessions
past_context = await retrieve_relevant_context(query, namespace)
# Generate response with past context
response = await generate_response(query, past_context)
# Extract and store new memories
conversation = [
Message(role="user", content=query),
Message(role="assistant", content=response)
]
await extract_and_store_memories(conversation, namespace)
return responseIntegration with Event Sourcing
Long-term memory is a perfect fit for PyWorkflow's event log:
New Event Type:
class EventType(str, Enum):
# ... existing types
MEMORY_STORED = "memory_stored" # NEWEvent Storage:
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.MEMORY_STORED,
data={
"memory_id": "mem_abc123",
"content": "User prefers concise responses",
"embedding": [0.23, -0.45, 0.67, ...], # 1536-dim vector
"memory_type": "semantic",
"namespace": "user:12345",
"metadata": {"source": "preference_extraction"}
}
))Storage Backend Reuse:
PyWorkflow already has 4 storage backends:
| Backend | Use Case | Memory Advantages |
|---|---|---|
| PostgreSQL | Production, large-scale | pgvector extension for efficient vector search |
| Redis | Fast, in-memory | RediSearch module for vector similarity |
| SQLite | Embedded, single-file | sqlite-vss extension for vectors |
| File | Development, simple | JSON storage (slow for large memories) |
Query Optimization:
For production deployments with PostgreSQL:
-- Add pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create memories table with vector column
CREATE TABLE memories (
memory_id TEXT PRIMARY KEY,
content TEXT,
embedding vector(1536), -- pgvector type
memory_type TEXT,
namespace TEXT,
metadata JSONB,
timestamp TIMESTAMP DEFAULT NOW()
);
-- Create index for fast vector search
CREATE INDEX ON memories USING ivfflat (embedding vector_cosine_ops);
-- Fast semantic search query
SELECT memory_id, content, embedding <=> $1 AS similarity
FROM memories
WHERE namespace = $2
ORDER BY embedding <=> $1
LIMIT 5;Trade-offs
Pros:
- Cumulative learning across sessions
- Personalization (remembers user preferences)
- Semantic retrieval (not just keyword matching)
- Scales to millions of memories with proper indexing
Cons:
- Requires vector database or extension (additional dependency)
- Embedding generation costs (API calls or local model)
- Storage overhead (embeddings are large: 1536 floats = 6KB each)
- Retrieval latency (vector search on large datasets)
- Privacy concerns (long-term data retention)
When to Use:
- Multi-session agents (customer support, personal assistants)
- Learning from past interactions (improve over time)
- Personalization (user-specific preferences)
- Knowledge accumulation (research, documentation)
When to Avoid:
- Single-session tasks (use conversation memory)
- Privacy-sensitive applications (unless encrypted)
- Resource-constrained environments (embedding overhead)
Related Issues
- Short-term / Conversation Memory for pyworkflow_agents #155 - Short-term / Conversation Memory - for current session context
- #[Issue 4] - Episodic Memory - specialized long-term memory for past experiences
- #[Issue 5] - Dual-Layer Context Strategy - combines short-term (hot) + long-term (cold)