Skip to content

Agent Pattern: Parallel Agent (Scatter-Gather) #168

@yasha-dev1

Description

@yasha-dev1

Overview

The Parallel Agent (Scatter-Gather) pattern distributes a task to multiple agents simultaneously, collects their independent results, and synthesizes them into a final answer. This pattern is highly effective for tasks that benefit from diverse perspectives or parallel processing.

Performance Impact: Anthropic's research shows that multi-agent systems using scatter-gather achieved 90% improvement over single-agent approaches on complex research tasks by combining multiple independent reasoning paths.

How It Works

  1. Scatter: Distribute the same or decomposed task to N agents in parallel
  2. Parallel Execution: Each agent independently processes the task
  3. Gather: Collect all agent results
  4. Synthesis: An aggregator agent (or logic) combines results into final output

Control Flow:

              Task
                |
         [Scatter Phase]
                |
    ┌───────────┼───────────┐
    ↓           ↓           ↓
Agent A      Agent B     Agent C    (Parallel Execution)
(perspective (perspective (perspective
    A)           B)          C)
    ↓           ↓           ↓
    └───────────┼───────────┘
                |
         [Gather Phase]
                |
           Aggregator
         (synthesize A+B+C)
                |
           Final Result

Reference Implementations

Proposed PyWorkflow Implementation

from pyworkflow_agents import Agent, ParallelAgentOrchestrator
from pyworkflow_agents.providers import AnthropicProvider
from pyworkflow import workflow, step
import asyncio

# Define parallel agents with different perspectives
perspective_a = Agent(
    name="optimistic_analyst",
    provider=AnthropicProvider(model="claude-sonnet-4-5-20250929"),
    instructions="Analyze the problem with an optimistic lens. Focus on opportunities.",
)

perspective_b = Agent(
    name="critical_analyst",
    provider=AnthropicProvider(model="claude-sonnet-4-5-20250929"),
    instructions="Analyze the problem critically. Identify risks and challenges.",
)

perspective_c = Agent(
    name="neutral_analyst",
    provider=AnthropicProvider(model="claude-sonnet-4-5-20250929"),
    instructions="Provide a balanced, neutral analysis of the problem.",
)

# Aggregator agent
aggregator = Agent(
    name="synthesizer",
    provider=AnthropicProvider(model="claude-opus-4-6"),  # More powerful model
    instructions="Synthesize multiple perspectives into a comprehensive analysis.",
)

# Method 1: Using PyWorkflow's parallel step execution
@workflow(durable=True)
async def scatter_gather_workflow(task: str):
    """
    Scatter task to parallel agents using PyWorkflow's Celery workers.
    Each agent runs as a step on a different Celery worker.
    """
    # Scatter: Execute agents in parallel on Celery step workers
    results = await asyncio.gather(
        run_agent_as_step(perspective_a, task),
        run_agent_as_step(perspective_b, task),
        run_agent_as_step(perspective_c, task),
    )
    
    # Gather: Collect results
    gathered_results = {
        "optimistic": results[0],
        "critical": results[1],
        "neutral": results[2],
    }
    
    # Synthesize: Aggregate results
    final_result = await run_agent_as_step(
        aggregator,
        f"Synthesize these perspectives:\n{gathered_results}"
    )
    
    return final_result

@step()
async def run_agent_as_step(agent: Agent, task: str):
    """
    Execute an agent as a PyWorkflow step (runs on Celery worker).
    """
    response = await agent.run(task)
    return response.content

# Method 2: Using ParallelAgentOrchestrator
orchestrator = ParallelAgentOrchestrator(
    agents=[perspective_a, perspective_b, perspective_c],
    aggregator=aggregator,
    timeout_per_agent="5m",
    aggregation_strategy="synthesis",  # Options: "synthesis", "voting", "weighted"
)

@workflow(durable=True)
async def orchestrated_scatter_gather(task: str):
    """
    Use ParallelAgentOrchestrator for cleaner scatter-gather.
    """
    result = await orchestrator.run(task)
    return result

# Method 3: Dynamic decomposition (advanced)
@workflow(durable=True)
async def dynamic_scatter_gather(complex_task: str):
    """
    Decompose task into subtasks, scatter to specialists.
    """
    # Step 1: Decompose task into subtasks
    subtasks = await decompose_task_as_step(complex_task)
    
    # Step 2: Scatter subtasks to parallel specialist agents
    results = await asyncio.gather(*[
        run_specialist_as_step(subtask)
        for subtask in subtasks
    ])
    
    # Step 3: Synthesize results
    final_result = await synthesize_results_as_step(results)
    return final_result

@step()
async def decompose_task_as_step(task: str):
    """Decompose complex task into independent subtasks."""
    decomposer = Agent(...)
    result = await decomposer.run(f"Break down this task: {task}")
    return result.subtasks  # List of subtasks

@step()
async def run_specialist_as_step(subtask: dict):
    """Run specialist agent for a subtask."""
    specialist = get_specialist_for_subtask(subtask)
    return await specialist.run(subtask["description"])

@step()
async def synthesize_results_as_step(results: list):
    """Synthesize parallel results."""
    return await aggregator.run(f"Synthesize: {results}")

Key Mapping to PyWorkflow Primitives:

  • Scatter = asyncio.gather() with multiple @step calls
  • Parallel execution = Each agent step runs on a different Celery worker
  • Gather = Await all steps, collect results
  • Synthesis = Final step that aggregates results
  • Parallelization = PyWorkflow's existing parallel step execution (Celery workers)

Event Types

New events for scatter-gather pattern:

class EventType(str, Enum):
    # Existing events...
    SCATTER_START = "scatter_start"              # Begin scatter phase
    AGENT_PARALLEL_START = "agent_parallel_start"  # Individual agent starts
    AGENT_PARALLEL_COMPLETE = "agent_parallel_complete"  # Individual agent completes
    GATHER_COMPLETE = "gather_complete"          # All agents gathered
    SYNTHESIS_START = "synthesis_start"          # Begin synthesis
    SYNTHESIS_COMPLETE = "synthesis_complete"    # Synthesis complete

Event Data Schema:

# SCATTER_START
{
    "num_agents": 3,
    "agent_names": ["optimistic_analyst", "critical_analyst", "neutral_analyst"],
    "task": "Analyze market trends",
    "timeout_per_agent": "5m"
}

# AGENT_PARALLEL_START
{
    "agent_name": "optimistic_analyst",
    "step_id": "step_agent_001",
    "worker_id": "celery@worker-2"
}

# AGENT_PARALLEL_COMPLETE
{
    "agent_name": "optimistic_analyst",
    "result_summary": "Identified 5 growth opportunities",
    "execution_time_ms": 3200
}

# GATHER_COMPLETE
{
    "total_agents": 3,
    "successful": 3,
    "failed": 0,
    "total_time_ms": 4100,  # Max of all parallel executions
    "gathered_results": ["optimistic", "critical", "neutral"]
}

# SYNTHESIS_COMPLETE
{
    "aggregation_strategy": "synthesis",
    "input_results_count": 3,
    "final_result_summary": "Balanced analysis with opportunities and risks",
    "synthesis_time_ms": 2800
}

Trade-offs

Pros

  • Performance: 90% improvement over single-agent (Anthropic research)
  • Diverse perspectives: Multiple independent reasoning paths reduce bias
  • Fault tolerance: Other agents continue if one fails
  • Scalability: Easily add more parallel agents (scale Celery workers)
  • Natural fit: PyWorkflow's Celery workers designed for this pattern
  • No orchestration overhead: No supervisor routing decisions (vs supervisor pattern)

Cons

  • Cost: N parallel LLM calls instead of 1
  • Aggregation complexity: Synthesizing diverse results is non-trivial
  • Wasted work: All agents do full work even if one result would suffice
  • Context size: Aggregator must process N agent outputs
  • Timeout management: Slow agents delay entire gather phase

When to Use

  • Research tasks requiring multiple perspectives
  • Complex problem-solving (Anthropic saw 90% improvement)
  • Tasks where diversity improves quality (code review, analysis)
  • I/O-bound tasks (API calls, database queries, file operations)
  • Need fault tolerance (one agent failure doesn't fail entire task)

When to Avoid

  • Simple tasks where single agent suffices
  • Cost-sensitive applications (N LLM calls expensive)
  • Low-latency requirements (gather phase has max latency of slowest agent)
  • Tasks requiring sequential dependencies (use supervisor or swarm)

Aggregation Strategies

1. Synthesis (Recommended)

Use an aggregator agent (LLM) to synthesize results:

aggregator = Agent(
    instructions="Synthesize multiple perspectives into a coherent analysis."
)
  • Pros: Intelligent combination, handles contradictions
  • Cons: Extra LLM call, slower

2. Voting

Agents vote on answer, majority wins:

orchestrator = ParallelAgentOrchestrator(
    aggregation_strategy="voting"
)
  • Pros: Fast, no extra LLM call
  • Cons: Binary choices only, loses nuance

3. Weighted Average

Weight agent results by confidence or past accuracy:

orchestrator = ParallelAgentOrchestrator(
    aggregation_strategy="weighted",
    weights={"agent_a": 0.5, "agent_b": 0.3, "agent_c": 0.2}
)
  • Pros: Incorporates agent reliability
  • Cons: Requires numeric results, need weight tuning

4. Consensus Selection

Select result with highest agreement:

orchestrator = ParallelAgentOrchestrator(
    aggregation_strategy="consensus"
)
  • Pros: Reduces outliers
  • Cons: May miss innovative minority views

Performance Best Practices

Based on AWS and Anthropic research:

  1. Asyncio for low latency: Use asyncio.gather() for Python concurrency
  2. Design for timeouts: One slow agent shouldn't block entire gather
  3. Handle failures gracefully: Continue with partial results if some agents fail
  4. Use appropriate aggregator: Match strategy to task type
  5. Scale Celery workers: Ensure enough workers for parallel execution

Implementation Checklist

  • Create ParallelAgentOrchestrator class in pyworkflow_agents/parallel.py
  • Implement aggregation strategies: synthesis, voting, weighted, consensus
  • Add SCATTER_*, GATHER_*, SYNTHESIS_* event types
  • Integrate with PyWorkflow's parallel step execution (Celery)
  • Add timeout handling per agent
  • Implement partial result handling (some agents fail)
  • Add worker load balancing (ensure enough Celery workers)
  • Create run_agent_as_step() helper
  • Add metrics: parallel speedup, agent agreement, synthesis quality
  • Create examples in examples/agents/scatter_gather_pattern.py
  • Add tests for all aggregation strategies
  • Document cost implications (N LLM calls)
  • Add visualization of parallel execution timeline

Related Issues

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAI Agent module (pyworkflow_agents)featureFeature to be implementedmulti-agentMulti-agent orchestration patterns

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions