Skip to content

Short-term / Conversation Memory for pyworkflow_agents #155

@yasha-dev1

Description

@yasha-dev1

Overview

Short-term / conversation memory stores the current conversation messages within the LLM's context window for a single agent session. This is the most basic memory type, providing immediate context awareness during an active workflow run.

How It Works

Messages are stored sequentially as they occur during the conversation:

  1. User message arrives → stored in conversation buffer
  2. Agent response generated → stored in conversation buffer
  3. Context window fills → oldest messages trimmed or summarized
  4. All messages in buffer included in next LLM call

Data Flow:

User Input → ConversationMemory.add(message)
           → ConversationMemory.get_context(max_tokens)
           → LLM Context Window
           → Agent Response

Reference Implementations

Proposed PyWorkflow Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from pyworkflow import workflow, step, get_context
from pyworkflow.engine.events import Event, EventType

@dataclass
class Message:
    """Single conversation message"""
    role: str  # "user", "assistant", "system"
    content: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)
    
    def token_count(self) -> int:
        """Estimate token count (rough heuristic)"""
        return len(self.content.split()) * 1.3

@dataclass
class ConversationMemory:
    """Short-term conversation memory within context window"""
    messages: List[Message] = field(default_factory=list)
    max_tokens: int = 100000  # Context window limit
    
    def add(self, message: Message):
        """Add message to conversation history"""
        self.messages.append(message)
    
    def get_context(self, max_tokens: Optional[int] = None) -> List[Message]:
        """Get messages that fit within token limit"""
        limit = max_tokens or self.max_tokens
        total_tokens = 0
        result = []
        
        # Include messages from most recent backwards
        for msg in reversed(self.messages):
            msg_tokens = msg.token_count()
            if total_tokens + msg_tokens > limit:
                break
            result.insert(0, msg)
            total_tokens += msg_tokens
        
        return result
    
    def clear(self):
        """Clear conversation history"""
        self.messages.clear()

@step()
async def process_user_message(user_input: str, memory: ConversationMemory) -> str:
    """Process user message with conversation context"""
    # Add user message to memory
    user_msg = Message(role="user", content=user_input)
    memory.add(user_msg)
    
    # Get context that fits in window
    context_messages = memory.get_context()
    
    # Call LLM with conversation history
    response = await call_llm(context_messages)
    
    # Store assistant response
    assistant_msg = Message(role="assistant", content=response)
    memory.add(assistant_msg)
    
    return response

@workflow(durable=True)
async def conversational_agent(user_inputs: List[str]):
    """Agent with conversation memory"""
    memory = ConversationMemory()
    
    for user_input in user_inputs:
        response = await process_user_message(user_input, memory)
        # Continue conversation...
    
    return memory.messages

Integration with Event Sourcing

Short-term memory maps naturally to PyWorkflow's event log:

Event Storage:

# Each message = AGENT_MESSAGE event
await ctx.storage.record_event(Event(
    run_id=ctx.run_id,
    type=EventType.AGENT_MESSAGE,
    data={
        "role": "user",
        "content": "What's the weather?",
        "timestamp": datetime.utcnow().isoformat()
    }
))

Event Replay:
On workflow resumption after suspension, all AGENT_MESSAGE events are replayed to reconstruct the conversation buffer:

# In EventReplayer._apply_event()
elif event.type == EventType.AGENT_MESSAGE:
    ctx.conversation_memory.add(Message(
        role=event.data["role"],
        content=event.data["content"],
        timestamp=event.data["timestamp"]
    ))

Key Benefits:

  • Fully replayable: Entire conversation reconstructed from events
  • Durable: Survives worker crashes via event replay
  • Inspectable: Full conversation history in event log
  • Debuggable: Time-travel through conversation via event sequence

Trade-offs

Pros:

  • Simple to implement and understand
  • Fast access (in-memory)
  • No external dependencies
  • Works well for short conversations

Cons:

  • Bounded by context window (typically 100K-200K tokens)
  • Loses oldest messages when trimming
  • No semantic search capability
  • Session-scoped only (doesn't persist across workflow runs)

When to Use:

  • Single workflow run conversations
  • Tasks requiring immediate recent context
  • Complement to long-term memory (hot path)

When to Avoid:

  • Multi-session conversations
  • Need to search past conversations semantically
  • Conversations exceeding context window regularly

Related Issues

  • #[Issue 2] - Summary Memory (Compressed Context) - for handling context overflow
  • #[Issue 3] - Long-term / Persistent Memory - for cross-session memory
  • #[Issue 5] - Dual-Layer Context Strategy - hot/cold path combining short-term + long-term

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAI Agent module (pyworkflow_agents)featureFeature to be implementedmemoryAgent memory and context management

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions