-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
agentsAI Agent module (pyworkflow_agents)AI Agent module (pyworkflow_agents)featureFeature to be implementedFeature to be implementedmemoryAgent memory and context managementAgent memory and context management
Description
Overview
Summary memory uses LLM-generated summaries of older conversation history to compress context when conversations exceed the context window. Instead of discarding old messages, they are compressed into informative summaries that preserve key information while reducing token count.
How It Works
When conversation history approaches the context window limit:
- Trigger: Unobserved messages hit token threshold (e.g., 30,000 tokens)
- Observer Agent: Background agent compresses old messages into dated observations
- Append: Compressed observations added to summary block
- Drop: Original messages removed from context
- Reflector Agent: When summaries reach threshold (e.g., 40,000 tokens), restructure and condense
Data Flow:
Messages [0...N] → Token Count > Threshold?
↓ YES
Observer LLM → Summary Block
↓
Drop Original Messages
↓
Summary Block > Threshold?
↓ YES
Reflector LLM → Condensed Summary
Two-Stage Compression:
- Stage 1 (Observer): Raw messages → Dated observations (3-6x compression for text, 5-40x for tool-heavy workloads)
- Stage 2 (Reflector): Observations → Restructured summary (additional 2-3x compression)
Reference Implementations
- LangChain ConversationSummaryMemory - LLM-based conversation summarization
- Observational Memory (VentureBeat 2025) - Two-agent compression (Observer + Reflector), 10x cost reduction
- ACON Framework (arXiv 2025) - Gradient-free context compression via guideline optimization
- Factory.ai Context Compression - Practical compression techniques and benchmarks
- OpenAI Agents SDK Compression - Compression techniques for session management
Proposed PyWorkflow Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from pyworkflow import workflow, step, get_context
from pyworkflow.engine.events import Event, EventType
@dataclass
class Observation:
"""Compressed summary of conversation segment"""
content: str
start_time: datetime
end_time: datetime
message_count: int
original_tokens: int
compressed_tokens: int
def compression_ratio(self) -> float:
return self.original_tokens / self.compressed_tokens if self.compressed_tokens > 0 else 0
@dataclass
class SummaryMemory:
"""Compressed conversation memory"""
observations: List[Observation] = field(default_factory=list)
uncompressed_messages: List[Message] = field(default_factory=list)
# Thresholds
compress_threshold: int = 30000 # Tokens before compression
reflection_threshold: int = 40000 # Tokens before re-compression
async def maybe_compress(self):
"""Compress uncompressed messages if threshold exceeded"""
total_tokens = sum(msg.token_count() for msg in self.uncompressed_messages)
if total_tokens >= self.compress_threshold:
await self._compress_messages()
async def _compress_messages(self):
"""Observer: Compress messages into observations"""
if not self.uncompressed_messages:
return
# Use LLM to summarize messages
summary_prompt = self._build_summary_prompt(self.uncompressed_messages)
summary = await call_llm_for_summary(summary_prompt)
observation = Observation(
content=summary,
start_time=self.uncompressed_messages[0].timestamp,
end_time=self.uncompressed_messages[-1].timestamp,
message_count=len(self.uncompressed_messages),
original_tokens=sum(m.token_count() for m in self.uncompressed_messages),
compressed_tokens=len(summary.split()) * 1.3
)
self.observations.append(observation)
# Record compaction event
ctx = get_context()
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.MEMORY_COMPACTED,
data={
"observation": observation.__dict__,
"compression_ratio": observation.compression_ratio(),
"messages_compressed": observation.message_count
}
))
# Clear uncompressed messages
self.uncompressed_messages.clear()
async def maybe_reflect(self):
"""Reflector: Re-compress observations if threshold exceeded"""
total_tokens = sum(
len(obs.content.split()) * 1.3
for obs in self.observations
)
if total_tokens >= self.reflection_threshold:
await self._reflect_observations()
async def _reflect_observations(self):
"""Reflector: Restructure and condense observations"""
if len(self.observations) <= 1:
return
# Use LLM to restructure observations
reflection_prompt = self._build_reflection_prompt(self.observations)
condensed_summary = await call_llm_for_reflection(reflection_prompt)
# Create single condensed observation
condensed = Observation(
content=condensed_summary,
start_time=self.observations[0].start_time,
end_time=self.observations[-1].end_time,
message_count=sum(o.message_count for o in self.observations),
original_tokens=sum(o.original_tokens for o in self.observations),
compressed_tokens=len(condensed_summary.split()) * 1.3
)
# Replace all observations with condensed version
self.observations = [condensed]
def get_context(self) -> str:
"""Get compressed context for LLM"""
parts = []
# Add observations (compressed history)
for obs in self.observations:
parts.append(f"[Summary {obs.start_time} - {obs.end_time}]")
parts.append(obs.content)
# Add recent uncompressed messages
for msg in self.uncompressed_messages:
parts.append(f"{msg.role}: {msg.content}")
return "\n\n".join(parts)
@workflow(durable=True)
async def agent_with_summary_memory(user_inputs: List[str]):
"""Agent with automatic context compression"""
memory = SummaryMemory()
for user_input in user_inputs:
# Add user message
user_msg = Message(role="user", content=user_input)
memory.uncompressed_messages.append(user_msg)
# Check if compression needed
await memory.maybe_compress()
await memory.maybe_reflect()
# Get compressed context for LLM
context = memory.get_context()
response = await call_llm(context)
# Add assistant response
assistant_msg = Message(role="assistant", content=response)
memory.uncompressed_messages.append(assistant_msg)
return memoryIntegration with Event Sourcing
Summary memory leverages PyWorkflow's event log with a new event type:
New Event Type:
class EventType(str, Enum):
# ... existing types
MEMORY_COMPACTED = "memory_compacted" # NEWRecording Compression:
# When compression occurs
await ctx.storage.record_event(Event(
run_id=ctx.run_id,
type=EventType.MEMORY_COMPACTED,
data={
"observation": {
"content": "Summary of messages 1-50...",
"start_time": "2026-02-14T10:00:00",
"end_time": "2026-02-14T10:30:00",
"message_count": 50,
"original_tokens": 45000,
"compressed_tokens": 8000
},
"compression_ratio": 5.625,
"messages_compressed": 50
}
))Event Replay:
On workflow resumption, MEMORY_COMPACTED events are replayed to reconstruct the summary memory:
# In EventReplayer._apply_event()
elif event.type == EventType.MEMORY_COMPACTED:
observation = Observation(**event.data["observation"])
ctx.summary_memory.observations.append(observation)Key Advantages:
- Compression is durable: Summaries persist in event log
- Deterministic replay: Same summaries reconstructed on resumption
- Auditable: See when/what was compressed in event history
- Debuggable: Compression ratio and effectiveness tracked
Trade-offs
Pros:
- Extends effective conversation length 3-40x
- Preserves key information from old context
- Reduces token costs significantly
- Enables longer-running conversations within context window
Cons:
- Lossy compression: Detail is lost in summarization
- LLM calls required: Compression itself costs tokens/time
- Latency: Background compression adds processing time
- Quality dependent on summarization prompt: Bad prompts = poor summaries
- Non-deterministic: Same messages may produce different summaries
When to Use:
- Long conversations exceeding context window
- Cost-sensitive applications (10x reduction possible)
- Tasks where gist matters more than exact detail
When to Avoid:
- Need exact message history (legal, compliance)
- Short conversations (overhead not worth it)
- Critical details must be preserved verbatim
Related Issues
- Short-term / Conversation Memory for pyworkflow_agents #155 - Short-term / Conversation Memory - provides uncompressed messages for recent context
- #[Issue 3] - Long-term / Persistent Memory - alternative for cross-session history
- #[Issue 5] - Dual-Layer Context Strategy - combines summary (hot) with retrieval (cold)
References
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
agentsAI Agent module (pyworkflow_agents)AI Agent module (pyworkflow_agents)featureFeature to be implementedFeature to be implementedmemoryAgent memory and context managementAgent memory and context management