Skip to content

State Delta Strategy (Efficient Multi-Agent Context) for pyworkflow_agents #173

@yasha-dev1

Description

@yasha-dev1

Overview

The State Delta Strategy passes only state changes between agents instead of full conversation history, dramatically reducing token costs in multi-agent systems. Rather than sending the entire conversation to each agent, only new updates (deltas) are transmitted, while each agent maintains its own view of shared state.

This is the LangGraph state channels pattern, enabling efficient multi-agent orchestration.

How It Works

Traditional Multi-Agent (Inefficient):

Agent A: Full conversation (1000 messages) → LLM
Agent B: Full conversation (1000 messages) → LLM
Agent C: Full conversation (1000 messages) → LLM

Total tokens: 3 × 1000 = 3000 messages sent

State Delta Strategy (Efficient):

Shared State: {counter: 0, status: "pending", results: []}

Agent A: Reads state → Updates {counter: 1} → Writes delta
Agent B: Reads state + delta → Updates {status: "processing"} → Writes delta
Agent C: Reads state + deltas → Updates {results: ["done"]} → Writes delta

Total tokens: Only deltas (3 updates) vs. full state (3 times)

Data Flow:

┌─────────────────────────────┐
│      Shared State Graph     │
│  {                          │
│    counter: 0,              │
│    status: "pending",       │
│    results: [],             │
│    metadata: {}             │
│  }                          │
└─────────────────────────────┘
         ↓ ↑ ↓ ↑ ↓ ↑
    ┌────┘   │   └────┐
    ↓        ↓        ↓
┌────────┐ ┌────────┐ ┌────────┐
│Agent A │ │Agent B │ │Agent C │
│(delta) │ │(delta) │ │(delta) │
└────────┘ └────────┘ └────────┘

Each agent:
1. Reads current state
2. Performs action
3. Writes ONLY changes (delta)
4. State graph applies delta

Key Mechanisms:

  1. Append-Only Updates: New data appended, never overwrite
  2. Reducer Functions: Define how to merge deltas
  3. Immutable State: Each update creates new version
  4. Channels: Separate delta streams per agent

Reference Implementations

Proposed PyWorkflow Implementation

PyWorkflow's event sourcing is perfect for state deltas because events ARE deltas:

from dataclasses import dataclass, field
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
from pyworkflow import workflow, step, get_context
from pyworkflow.engine.events import Event, EventType

class ReducerOp(str, Enum):
    """How to merge state deltas"""
    REPLACE = "replace"  # Overwrite
    APPEND = "append"    # Add to list
    MERGE = "merge"      # Merge dicts
    INCREMENT = "increment"  # Add numbers

@dataclass
class StateSchema:
    """Shared state schema with reducers"""
    fields: Dict[str, Any] = field(default_factory=dict)
    reducers: Dict[str, ReducerOp] = field(default_factory=dict)
    
    def apply_delta(self, field: str, delta: Any):
        """Apply delta to field using reducer"""
        reducer = self.reducers.get(field, ReducerOp.REPLACE)
        
        if reducer == ReducerOp.REPLACE:
            self.fields[field] = delta
        
        elif reducer == ReducerOp.APPEND:
            if field not in self.fields:
                self.fields[field] = []
            self.fields[field].append(delta)
        
        elif reducer == ReducerOp.MERGE:
            if field not in self.fields:
                self.fields[field] = {}
            self.fields[field].update(delta)
        
        elif reducer == ReducerOp.INCREMENT:
            if field not in self.fields:
                self.fields[field] = 0
            self.fields[field] += delta
    
    def get(self, field: str, default=None):
        """Get field value"""
        return self.fields.get(field, default)

class StateGraph:
    """Multi-agent shared state with delta updates"""
    
    def __init__(self, schema: StateSchema):
        self.schema = schema
        self.version = 0
    
    async def update(self, agent_name: str, deltas: Dict[str, Any]):
        """Agent writes delta to shared state"""
        ctx = get_context()
        
        # Apply each delta
        for field, value in deltas.items():
            self.schema.apply_delta(field, value)
        
        # Increment version
        self.version += 1
        
        # Record delta as event (immutable log)
        await ctx.storage.record_event(Event(
            run_id=ctx.run_id,
            type=EventType.STATE_DELTA,
            data={
                "agent": agent_name,
                "version": self.version,
                "deltas": deltas,
                "timestamp": datetime.utcnow().isoformat()
            }
        ))
    
    def get_state(self) -> Dict[str, Any]:
        """Get current state snapshot"""
        return self.schema.fields.copy()

# Example: Multi-agent research workflow

@step()
async def researcher_agent(state_graph: StateGraph) -> str:
    """Agent A: Research topic"""
    # Read current state
    current_state = state_graph.get_state()
    topic = current_state.get("topic", "AI agents")
    
    # Perform research
    findings = await research_topic(topic)
    
    # Write ONLY delta (not full state)
    await state_graph.update("researcher", {
        "findings": findings,  # APPEND (reducer)
        "researchers_completed": 1  # INCREMENT (reducer)
    })
    
    return findings

@step()
async def analyzer_agent(state_graph: StateGraph) -> str:
    """Agent B: Analyze findings"""
    # Read current state
    current_state = state_graph.get_state()
    findings = current_state.get("findings", [])
    
    # Analyze
    analysis = await analyze_findings(findings)
    
    # Write ONLY delta
    await state_graph.update("analyzer", {
        "analysis": analysis,  # REPLACE (reducer)
        "analyzers_completed": 1  # INCREMENT (reducer)
    })
    
    return analysis

@step()
async def writer_agent(state_graph: StateGraph) -> str:
    """Agent C: Write report"""
    # Read current state
    current_state = state_graph.get_state()
    findings = current_state.get("findings", [])
    analysis = current_state.get("analysis", "")
    
    # Write report
    report = await write_report(findings, analysis)
    
    # Write ONLY delta
    await state_graph.update("writer", {
        "report": report,  # REPLACE (reducer)
        "writers_completed": 1  # INCREMENT (reducer)
    })
    
    return report

@workflow(durable=True)
async def multi_agent_research(topic: str):
    """Multi-agent workflow with state deltas"""
    # Define shared state schema
    schema = StateSchema(
        fields={"topic": topic},
        reducers={
            "findings": ReducerOp.APPEND,
            "analysis": ReducerOp.REPLACE,
            "report": ReducerOp.REPLACE,
            "researchers_completed": ReducerOp.INCREMENT,
            "analyzers_completed": ReducerOp.INCREMENT,
            "writers_completed": ReducerOp.INCREMENT
        }
    )
    
    state_graph = StateGraph(schema)
    
    # Agents run in sequence (or parallel if independent)
    await researcher_agent(state_graph)
    await analyzer_agent(state_graph)
    await writer_agent(state_graph)
    
    # Return final state
    final_state = state_graph.get_state()
    
    return final_state["report"]

# Advanced: Parallel agents with channels

@dataclass
class Channel:
    """Separate delta stream for agent communication"""
    name: str
    deltas: List[Dict[str, Any]] = field(default_factory=list)
    
    async def send(self, delta: Dict[str, Any]):
        """Send delta to channel"""
        self.deltas.append(delta)
        
        ctx = get_context()
        await ctx.storage.record_event(Event(
            run_id=ctx.run_id,
            type=EventType.CHANNEL_MESSAGE,
            data={
                "channel": self.name,
                "delta": delta
            }
        ))
    
    def receive_all(self) -> List[Dict[str, Any]]:
        """Get all deltas from channel"""
        return self.deltas.copy()

@workflow(durable=True)
async def parallel_agents_with_channels():
    """Multiple agents communicate via channels"""
    # Create channels
    results_channel = Channel(name="results")
    status_channel = Channel(name="status")
    
    # Run agents in parallel
    await asyncio.gather(
        agent_a(results_channel, status_channel),
        agent_b(results_channel, status_channel),
        agent_c(results_channel, status_channel)
    )
    
    # Collect results from channel
    all_results = results_channel.receive_all()
    
    return all_results

Integration with Event Sourcing

State deltas map 1:1 to PyWorkflow events:

New Event Types:

class EventType(str, Enum):
    # ... existing types
    STATE_DELTA = "state_delta"  # NEW
    CHANNEL_MESSAGE = "channel_message"  # NEW

Event as Delta:

Every state update is an event:

# Agent A updates state
Event(
    type=EventType.STATE_DELTA,
    data={
        "agent": "researcher",
        "version": 1,
        "deltas": {
            "findings": ["Finding 1"],
            "researchers_completed": 1
        }
    }
)

# Agent B updates state
Event(
    type=EventType.STATE_DELTA,
    data={
        "agent": "analyzer",
        "version": 2,
        "deltas": {
            "analysis": "Analysis summary..."
        }
    }
)

State Reconstruction via Event Replay:

On workflow resumption, state is reconstructed by replaying deltas:

# In EventReplayer._apply_event()
elif event.type == EventType.STATE_DELTA:
    for field, value in event.data["deltas"].items():
        ctx.state_graph.schema.apply_delta(field, value)
    ctx.state_graph.version = event.data["version"]

Key Advantages:

  1. Events ARE deltas: No additional overhead
  2. Immutable log: Full audit trail of state changes
  3. Deterministic replay: State always reconstructible
  4. Debuggable: See which agent changed what, when
  5. Time-travel: Replay to any version

Trade-offs

Pros:

  • Massive token savings: Only deltas, not full state
  • Efficient multi-agent: Scales to many agents
  • Clear ownership: See which agent changed what
  • Deterministic: Reducer functions define merge behavior
  • Debuggable: Event log shows all state transitions

Cons:

  • Less context per agent: Agents don't see full conversation
  • Coordination overhead: Reducer logic must be correct
  • Potential conflicts: Concurrent updates need careful handling
  • Learning curve: More complex than passing full state

When to Use:

  • Multi-agent systems (3+ agents)
  • Token-sensitive applications
  • Clear separation of concerns (each agent owns fields)
  • Long-running workflows with evolving state

When to Avoid:

  • Single agent (no benefit)
  • Agents need full conversation context
  • Rapid prototyping (simpler to pass full state initially)

Related Issues

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAI Agent module (pyworkflow_agents)featureFeature to be implementedmemoryAgent memory and context management

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions