diff --git a/.claude/templates/coding_prompt.template.md b/.claude/templates/coding_prompt.template.md
index 6da10a2f..c48680b4 100644
--- a/.claude/templates/coding_prompt.template.md
+++ b/.claude/templates/coding_prompt.template.md
@@ -3,6 +3,20 @@
You are continuing work on a long-running autonomous development task.
This is a FRESH context window - you have no memory of previous sessions.
+### PARALLEL AGENT MODE
+
+Check if you're running as a parallel agent:
+
+```bash
+echo "AGENT_ID: $AGENT_ID"
+```
+
+**If AGENT_ID is set (e.g., "agent-1", "agent-2"):**
+- You are one of multiple agents working in parallel on this project
+- You MUST use `feature_claim_next` instead of `feature_get_next` to avoid conflicts
+- Other agents are working in separate git worktrees on the same codebase
+- The features database is shared - atomic claiming prevents race conditions
+
### STEP 1: GET YOUR BEARINGS (MANDATORY)
Start by orienting yourself:
@@ -96,6 +110,17 @@ Features are **test cases** that drive development. This is test-driven developm
Get the next feature to implement:
+**If running in PARALLEL MODE (AGENT_ID is set):**
+
+```
+# Atomically claim the next feature (prevents race conditions with other agents)
+Use the feature_claim_next tool with agent_id=$AGENT_ID
+```
+
+This single call gets AND claims the feature atomically - no need to call `feature_mark_in_progress` separately.
+
+**If running in SINGLE AGENT MODE (AGENT_ID is not set):**
+
```
# Get the highest-priority pending feature
Use the feature_get_next tool
@@ -273,7 +298,7 @@ Use the feature_mark_passing tool with feature_id=42
**ONLY MARK A FEATURE AS PASSING AFTER VERIFICATION WITH SCREENSHOTS.**
-### STEP 8: COMMIT YOUR PROGRESS
+### STEP 8: COMMIT AND MERGE YOUR PROGRESS
Make a descriptive git commit:
@@ -284,10 +309,41 @@ git commit -m "Implement [feature name] - verified end-to-end
- Added [specific changes]
- Tested with browser automation
- Marked feature #X as passing
-- Screenshots in verification/ directory
"
```
+**If running in PARALLEL MODE (AGENT_ID is set), merge your changes immediately:**
+
+After each completed feature, integrate your changes so other agents can benefit.
+
+**IMPORTANT:** You're in a git worktree, so you cannot checkout the main branch directly.
+Use this worktree-safe approach:
+
+```bash
+# 1. Determine the main branch name
+MAIN_BRANCH=$(cd "$(git rev-parse --git-common-dir)/.." && git branch --show-current 2>/dev/null || echo "main")
+
+# 2. Pull latest main into your branch (get others' changes first)
+git fetch origin $MAIN_BRANCH 2>/dev/null || true
+git merge origin/$MAIN_BRANCH --no-edit 2>/dev/null || git merge FETCH_HEAD --no-edit 2>/dev/null || true
+
+# 3. Push your branch to main (worktree-safe merge)
+git push . HEAD:$MAIN_BRANCH
+```
+
+The `git push . HEAD:main` command merges your current branch into main without needing to checkout main.
+
+**If merge conflicts occur:**
+- On step 2 (pulling main): Resolve conflicts, then `git add . && git commit -m "Merge main"`
+- On step 3 (pushing to main): This means main has changes not in your branch
+ - Run step 2 again to get those changes, then retry step 3
+- If complex conflicts: Skip merging for now, continue working. Use UI "Merge" button later.
+
+**Why merge after each feature?**
+- Other agents immediately get your improvements
+- Prevents large conflict pile-ups at the end
+- Each agent stays closer to the "true" state of the project
+
### STEP 9: UPDATE PROGRESS NOTES
Update `claude-progress.txt` with:
@@ -369,9 +425,13 @@ The feature tools exist to reduce token usage. **DO NOT make exploratory queries
feature_get_stats
# 2. Get the NEXT feature to work on (one feature only)
+# - SINGLE AGENT MODE: Use feature_get_next
+# - PARALLEL MODE: Use feature_claim_next with agent_id=$AGENT_ID (atomic claim)
feature_get_next
+feature_claim_next with agent_id={agent_id} # PARALLEL MODE ONLY
# 3. Mark a feature as in-progress (call immediately after feature_get_next)
+# NOTE: Skip this if you used feature_claim_next (it claims automatically)
feature_mark_in_progress with feature_id={id}
# 4. Get up to 3 random passing features for regression testing
@@ -385,6 +445,9 @@ feature_skip with feature_id={id}
# 7. Clear in-progress status (when abandoning a feature)
feature_clear_in_progress with feature_id={id}
+
+# 8. Release a feature back to the queue (PARALLEL MODE - if you can't complete it)
+feature_release with feature_id={id} agent_id={agent_id}
```
### RULES:
diff --git a/.gitignore b/.gitignore
index d14182cd..d5b4c9f1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -98,6 +98,7 @@ Desktop.ini
ui/dist/
ui/.vite/
.vite/
+*.tsbuildinfo
# ===================
# Environment files
diff --git a/CLAUDE.md b/CLAUDE.md
index 51c09493..43aa700e 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -71,6 +71,42 @@ python autonomous_agent_demo.py --project-dir my-app --yolo
**When to use:** Early prototyping when you want to quickly scaffold features without verification overhead. Switch back to standard mode for production-quality development.
+### Parallel Agents Mode
+
+Run multiple agents simultaneously to speed up project completion:
+
+```bash
+# CLI: Run 3 agents in parallel
+python autonomous_agent_demo.py --project-dir my-app --num-agents 3
+
+# Combine with YOLO mode for fastest iteration
+python autonomous_agent_demo.py --project-dir my-app --num-agents 3 --yolo
+
+# API: Start parallel agents via REST
+POST /api/projects/{project_name}/parallel-agents/start
+Body: { "num_agents": 3, "yolo_mode": false }
+```
+
+**How it works:**
+- Each agent gets its own **git worktree** for code isolation
+- All agents share the same **features.db** for task coordination
+- Features are **atomically claimed** to prevent conflicts
+- Agent assignment is tracked via `assigned_agent_id` on features
+
+**API Endpoints:**
+- `GET /api/projects/{name}/parallel-agents/status` - Get status of all agents
+- `POST /api/projects/{name}/parallel-agents/start` - Start N agents
+- `POST /api/projects/{name}/parallel-agents/stop` - Stop all agents
+- `POST /api/projects/{name}/parallel-agents/merge` - Merge worktree changes
+- `POST /api/projects/{name}/parallel-agents/cleanup` - Stop and cleanup
+
+**MCP Tools for Parallel Agents:**
+- `feature_claim_next(agent_id)` - Atomically claim next available feature
+- `feature_release(feature_id, agent_id)` - Release feature back to queue
+- `feature_get_next(agent_id)` - Get next feature (excludes others' assignments)
+
+**When to use:** When you have many independent features and want to parallelize implementation. Best combined with YOLO mode for maximum speed.
+
### React UI (in ui/ directory)
```bash
diff --git a/agent.py b/agent.py
index e4d0de49..cf6e9f62 100644
--- a/agent.py
+++ b/agent.py
@@ -111,6 +111,7 @@ async def run_autonomous_agent(
model: str,
max_iterations: Optional[int] = None,
yolo_mode: bool = False,
+ agent_id: Optional[str] = None,
) -> None:
"""
Run the autonomous agent loop.
@@ -120,20 +121,28 @@ async def run_autonomous_agent(
model: Claude model to use
max_iterations: Maximum number of iterations (None for unlimited)
yolo_mode: If True, skip browser testing and use YOLO prompt
+ agent_id: Optional agent identifier for parallel execution
"""
+ prefix = f"[{agent_id}] " if agent_id else ""
+
print("\n" + "=" * 70)
- print(" AUTONOMOUS CODING AGENT DEMO")
+ if agent_id:
+ print(f" AUTONOMOUS CODING AGENT - {agent_id}")
+ else:
+ print(" AUTONOMOUS CODING AGENT DEMO")
print("=" * 70)
- print(f"\nProject directory: {project_dir}")
- print(f"Model: {model}")
+ print(f"\n{prefix}Project directory: {project_dir}")
+ print(f"{prefix}Model: {model}")
+ if agent_id:
+ print(f"{prefix}Agent ID: {agent_id}")
if yolo_mode:
- print("Mode: YOLO (testing disabled)")
+ print(f"{prefix}Mode: YOLO (testing disabled)")
else:
- print("Mode: Standard (full testing)")
+ print(f"{prefix}Mode: Standard (full testing)")
if max_iterations:
- print(f"Max iterations: {max_iterations}")
+ print(f"{prefix}Max iterations: {max_iterations}")
else:
- print("Max iterations: Unlimited (will run until completion)")
+ print(f"{prefix}Max iterations: Unlimited (will run until completion)")
print()
# Create project directory
@@ -175,7 +184,7 @@ async def run_autonomous_agent(
print_session_header(iteration, is_first_run)
# Create client (fresh context)
- client = create_client(project_dir, model, yolo_mode=yolo_mode)
+ client = create_client(project_dir, model, yolo_mode=yolo_mode, agent_id=agent_id)
# Choose prompt based on session type
# Pass project_dir to enable project-specific prompts
diff --git a/api/database.py b/api/database.py
index a74b857a..4e601800 100644
--- a/api/database.py
+++ b/api/database.py
@@ -29,6 +29,7 @@ class Feature(Base):
steps = Column(JSON, nullable=False) # Stored as JSON array
passes = Column(Boolean, default=False, index=True)
in_progress = Column(Boolean, default=False, index=True)
+ assigned_agent_id = Column(String(50), nullable=True, index=True) # Agent working on this feature
def to_dict(self) -> dict:
"""Convert feature to dictionary for JSON serialization."""
@@ -41,6 +42,7 @@ def to_dict(self) -> dict:
"steps": self.steps,
"passes": self.passes,
"in_progress": self.in_progress,
+ "assigned_agent_id": self.assigned_agent_id,
}
@@ -73,6 +75,21 @@ def _migrate_add_in_progress_column(engine) -> None:
conn.commit()
+def _migrate_add_assigned_agent_id_column(engine) -> None:
+ """Add assigned_agent_id column to existing databases that don't have it."""
+ from sqlalchemy import text
+
+ with engine.connect() as conn:
+ # Check if column exists
+ result = conn.execute(text("PRAGMA table_info(features)"))
+ columns = [row[1] for row in result.fetchall()]
+
+ if "assigned_agent_id" not in columns:
+ # Add the column (nullable, no default)
+ conn.execute(text("ALTER TABLE features ADD COLUMN assigned_agent_id VARCHAR(50)"))
+ conn.commit()
+
+
def create_database(project_dir: Path) -> tuple:
"""
Create database and return engine + session maker.
@@ -87,8 +104,9 @@ def create_database(project_dir: Path) -> tuple:
engine = create_engine(db_url, connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
- # Migrate existing databases to add in_progress column
+ # Migrate existing databases to add new columns
_migrate_add_in_progress_column(engine)
+ _migrate_add_assigned_agent_id_column(engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return engine, SessionLocal
diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py
index f240cc28..fa88afea 100644
--- a/autonomous_agent_demo.py
+++ b/autonomous_agent_demo.py
@@ -19,6 +19,9 @@
# YOLO mode: rapid prototyping without browser testing
python autonomous_agent_demo.py --project-dir my-app --yolo
+
+ # Parallel agents: run multiple agents simultaneously
+ python autonomous_agent_demo.py --project-dir my-app --num-agents 3
"""
import argparse
@@ -95,6 +98,13 @@ def parse_args() -> argparse.Namespace:
help="Enable YOLO mode: rapid prototyping without browser testing",
)
+ parser.add_argument(
+ "--num-agents",
+ type=int,
+ default=1,
+ help="Number of parallel agents to run (default: 1, max: 10)",
+ )
+
return parser.parse_args()
@@ -126,22 +136,92 @@ def main() -> None:
print("Use an absolute path or register the project first.")
return
- try:
- # Run the agent (MCP server handles feature database)
- asyncio.run(
- run_autonomous_agent(
- project_dir=project_dir,
- model=args.model,
- max_iterations=args.max_iterations,
- yolo_mode=args.yolo,
- )
+ # Check if parallel mode requested
+ num_agents = min(args.num_agents, 10) # Cap at 10 agents
+
+ if num_agents > 1:
+ # Parallel agent mode
+ from parallel_agents import ParallelAgentOrchestrator
+
+ print(f"\n{'=' * 70}")
+ print(f" PARALLEL AGENT MODE - {num_agents} AGENTS")
+ print("=" * 70)
+ print(f"\nProject directory: {project_dir}")
+ print(f"Model: {args.model}")
+ print(f"Number of agents: {num_agents}")
+ if args.yolo:
+ print("Mode: YOLO (testing disabled)")
+ print()
+
+ root_dir = Path(__file__).parent
+ orchestrator = ParallelAgentOrchestrator(
+ project_dir=project_dir,
+ root_dir=root_dir,
+ max_agents=num_agents,
)
- except KeyboardInterrupt:
- print("\n\nInterrupted by user")
- print("To resume, run the same command again")
- except Exception as e:
- print(f"\nFatal error: {e}")
- raise
+
+ async def run_parallel():
+ """Run multiple agents in parallel and wait for completion."""
+ try:
+ results = await orchestrator.start_agents(
+ num_agents=num_agents,
+ yolo_mode=args.yolo,
+ model=args.model,
+ max_iterations=args.max_iterations,
+ )
+ print(f"\nStarted agents: {results}")
+
+ # Wait for all agents to complete (or user interrupt)
+ while True:
+ health = await orchestrator.healthcheck()
+ running = sum(1 for v in health.values() if v)
+ if running == 0:
+ # Distinguish between completion and crashes
+ statuses = orchestrator.get_all_statuses()
+ crashed = [s["agent_id"] for s in statuses if s["status"] == "crashed"]
+ stopped = [s["agent_id"] for s in statuses if s["status"] == "stopped"]
+
+ print("\nAll agents have finished.")
+ if crashed:
+ print(f" Crashed: {', '.join(crashed)}")
+ if stopped:
+ print(f" Completed: {', '.join(stopped)}")
+ break
+ await asyncio.sleep(5)
+
+ except KeyboardInterrupt:
+ print("\n\nInterrupted - stopping all agents...")
+ await orchestrator.stop_all_agents()
+ finally:
+ # Optional: merge worktree changes
+ print("\nCleaning up worktrees...")
+ await orchestrator.cleanup()
+
+ try:
+ asyncio.run(run_parallel())
+ except KeyboardInterrupt:
+ print("\n\nInterrupted by user")
+ except Exception as e:
+ print(f"\nFatal error: {e}")
+ raise
+ else:
+ # Single agent mode (original behavior)
+ try:
+ # Run the agent (MCP server handles feature database)
+ asyncio.run(
+ run_autonomous_agent(
+ project_dir=project_dir,
+ model=args.model,
+ max_iterations=args.max_iterations,
+ yolo_mode=args.yolo,
+ )
+ )
+ except KeyboardInterrupt:
+ print("\n\nInterrupted by user")
+ print("To resume, run the same command again")
+ except Exception as e:
+ print(f"\nFatal error: {e}")
+ raise
if __name__ == "__main__":
diff --git a/client.py b/client.py
index 0f68e5ef..b2aca712 100644
--- a/client.py
+++ b/client.py
@@ -25,6 +25,9 @@
"mcp__features__feature_mark_passing",
"mcp__features__feature_skip",
"mcp__features__feature_create_bulk",
+ "mcp__features__feature_claim_next", # Atomic claim for parallel agents
+ "mcp__features__feature_release", # Release feature back to queue
+ "mcp__features__feature_clear_in_progress",
]
# Playwright MCP tools for browser automation
@@ -73,7 +76,12 @@
]
-def create_client(project_dir: Path, model: str, yolo_mode: bool = False):
+def create_client(
+ project_dir: Path,
+ model: str,
+ yolo_mode: bool = False,
+ agent_id: str | None = None,
+):
"""
Create a Claude Agent SDK client with multi-layered security.
@@ -81,6 +89,7 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False):
project_dir: Directory for the project
model: Claude model to use
yolo_mode: If True, skip Playwright MCP server for rapid prototyping
+ agent_id: Optional agent identifier for parallel execution
Returns:
Configured ClaudeSDKClient (from claude_agent_sdk)
@@ -149,6 +158,8 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False):
else:
print(" - MCP servers: playwright (browser), features (database)")
print(" - Project settings enabled (skills, commands, CLAUDE.md)")
+ if agent_id:
+ print(f" - Parallel mode: Agent ID = {agent_id}")
print()
# Use system Claude CLI instead of bundled one (avoids Bun runtime crash on Windows)
@@ -159,17 +170,23 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False):
print(" - Warning: System Claude CLI not found, using bundled CLI")
# Build MCP servers config - features is always included, playwright only in standard mode
+ mcp_env = {
+ # Inherit parent environment (PATH, ANTHROPIC_API_KEY, etc.)
+ **os.environ,
+ # Add custom variables
+ "PROJECT_DIR": str(project_dir.resolve()),
+ "PYTHONPATH": str(Path(__file__).parent.resolve()),
+ }
+
+ # Add agent_id for parallel execution
+ if agent_id:
+ mcp_env["AGENT_ID"] = agent_id
+
mcp_servers = {
"features": {
"command": sys.executable, # Use the same Python that's running this script
"args": ["-m", "mcp_server.feature_mcp"],
- "env": {
- # Inherit parent environment (PATH, ANTHROPIC_API_KEY, etc.)
- **os.environ,
- # Add custom variables
- "PROJECT_DIR": str(project_dir.resolve()),
- "PYTHONPATH": str(Path(__file__).parent.resolve()),
- },
+ "env": mcp_env,
},
}
if not yolo_mode:
diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py
index 8c5f3c83..ef0ce903 100644
--- a/mcp_server/feature_mcp.py
+++ b/mcp_server/feature_mcp.py
@@ -142,27 +142,52 @@ def feature_get_stats() -> str:
@mcp.tool()
-def feature_get_next() -> str:
+def feature_get_next(
+ agent_id: Annotated[str, Field(default="", description="Optional agent ID to filter out features being worked on by other agents")] = ""
+) -> str:
"""Get the highest-priority pending feature to work on.
- Returns the feature with the lowest priority number that has passes=false.
+ Returns the feature with the lowest priority number that has passes=false
+ and is not currently being worked on by another agent.
Use this at the start of each coding session to determine what to implement next.
+ NOTE: In parallel agent mode (AGENT_ID env var set), this automatically claims
+ the feature to prevent race conditions with other agents.
+
+ Args:
+ agent_id: Optional agent ID. If provided, excludes features assigned to other agents.
+ Auto-detected from AGENT_ID environment variable if not provided.
+
Returns:
- JSON with feature details (id, priority, category, name, description, steps, passes, in_progress)
- or error message if all features are passing.
+ JSON with feature details (id, priority, category, name, description, steps, passes, in_progress, assigned_agent_id)
+ or error message if all features are passing or assigned.
"""
+ # Auto-detect agent_id from environment if not provided
+ if not agent_id:
+ agent_id = os.environ.get("AGENT_ID", "")
+
session = get_session()
try:
- feature = (
- session.query(Feature)
- .filter(Feature.passes == False)
- .order_by(Feature.priority.asc(), Feature.id.asc())
- .first()
- )
+ query = session.query(Feature).filter(Feature.passes == False)
+
+ # If agent_id provided, exclude features assigned to other agents
+ if agent_id:
+ query = query.filter(
+ (Feature.assigned_agent_id == None) |
+ (Feature.assigned_agent_id == agent_id)
+ )
+
+ feature = query.order_by(Feature.priority.asc(), Feature.id.asc()).first()
if feature is None:
- return json.dumps({"error": "All features are passing! No more work to do."})
+ return json.dumps({"error": "All features are passing or assigned to other agents! No more work to do."})
+
+ # In parallel mode, automatically claim the feature to prevent race conditions
+ if agent_id and not feature.in_progress:
+ feature.in_progress = True
+ feature.assigned_agent_id = agent_id
+ session.commit()
+ session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
@@ -209,8 +234,9 @@ def feature_mark_passing(
) -> str:
"""Mark a feature as passing after successful implementation.
- Updates the feature's passes field to true and clears the in_progress flag.
- Use this after you have implemented the feature and verified it works correctly.
+ Updates the feature's passes field to true and clears the in_progress flag
+ and agent assignment. Use this after you have implemented the feature and
+ verified it works correctly.
Args:
feature_id: The ID of the feature to mark as passing
@@ -227,6 +253,7 @@ def feature_mark_passing(
feature.passes = True
feature.in_progress = False
+ feature.assigned_agent_id = None # Clear agent assignment on completion
session.commit()
session.refresh(feature)
@@ -290,7 +317,8 @@ def feature_skip(
@mcp.tool()
def feature_mark_in_progress(
- feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)]
+ feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)],
+ agent_id: Annotated[str, Field(default="", description="Optional agent ID to assign this feature to")] = ""
) -> str:
"""Mark a feature as in-progress. Call immediately after feature_get_next().
@@ -299,9 +327,10 @@ def feature_mark_in_progress(
Args:
feature_id: The ID of the feature to mark as in-progress
+ agent_id: Optional agent ID to assign this feature to
Returns:
- JSON with the updated feature details, or error if not found or already in-progress.
+ JSON with the updated feature details, or error if not found or already in-progress by another agent.
"""
session = get_session()
try:
@@ -313,10 +342,16 @@ def feature_mark_in_progress(
if feature.passes:
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})
- if feature.in_progress:
- return json.dumps({"error": f"Feature with ID {feature_id} is already in-progress"})
+ # Check if already in progress by another agent
+ if feature.in_progress and feature.assigned_agent_id and agent_id:
+ if feature.assigned_agent_id != agent_id:
+ return json.dumps({
+ "error": f"Feature with ID {feature_id} is already in-progress by agent {feature.assigned_agent_id}"
+ })
feature.in_progress = True
+ if agent_id:
+ feature.assigned_agent_id = agent_id
session.commit()
session.refresh(feature)
@@ -348,6 +383,7 @@ def feature_clear_in_progress(
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
feature.in_progress = False
+ feature.assigned_agent_id = None
session.commit()
session.refresh(feature)
@@ -356,6 +392,101 @@ def feature_clear_in_progress(
session.close()
+@mcp.tool()
+def feature_claim_next(
+ agent_id: Annotated[str, Field(description="The agent ID claiming the feature")]
+) -> str:
+ """Atomically get and claim the next available feature for an agent.
+
+ This is the preferred method for parallel agents to avoid race conditions.
+ It combines feature_get_next and feature_mark_in_progress into a single
+ atomic operation.
+
+ Args:
+ agent_id: The agent ID claiming the feature
+
+ Returns:
+ JSON with the claimed feature details, or error if no features available.
+ """
+ session = get_session()
+ try:
+ # Find the next available feature not assigned to another agent
+ # A feature is available if:
+ # 1. Not in progress AND not assigned to anyone, OR
+ # 2. Already assigned to this agent (allow re-claiming own feature)
+ feature = (
+ session.query(Feature)
+ .filter(Feature.passes == False)
+ .filter(
+ ((Feature.in_progress == False) & (Feature.assigned_agent_id == None)) |
+ (Feature.assigned_agent_id == agent_id)
+ )
+ .order_by(Feature.priority.asc(), Feature.id.asc())
+ .with_for_update() # Lock the row
+ .first()
+ )
+
+ if feature is None:
+ return json.dumps({"error": "No features available to claim. All are passing or assigned."})
+
+ # Claim the feature
+ feature.in_progress = True
+ feature.assigned_agent_id = agent_id
+ session.commit()
+ session.refresh(feature)
+
+ return json.dumps(feature.to_dict(), indent=2)
+ except Exception as e:
+ session.rollback()
+ return json.dumps({"error": f"Failed to claim feature: {str(e)}"})
+ finally:
+ session.close()
+
+
+@mcp.tool()
+def feature_release(
+ feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)],
+ agent_id: Annotated[str, Field(default="", description="The agent ID releasing the feature")] = ""
+) -> str:
+ """Release a feature back to the queue without marking it as passing.
+
+ Use this when an agent needs to stop working on a feature but hasn't
+ completed it. The feature will be available for other agents to claim.
+
+ Args:
+ feature_id: The ID of the feature to release
+ agent_id: Optional agent ID for verification
+
+ Returns:
+ JSON with the updated feature details, or error if not found.
+ """
+ session = get_session()
+ try:
+ feature = session.query(Feature).filter(Feature.id == feature_id).first()
+
+ if feature is None:
+ return json.dumps({"error": f"Feature with ID {feature_id} not found"})
+
+ # Only release if the agent owns it or no agent specified
+ if agent_id and feature.assigned_agent_id and feature.assigned_agent_id != agent_id:
+ return json.dumps({
+ "error": f"Feature is assigned to agent {feature.assigned_agent_id}, not {agent_id}"
+ })
+
+ feature.in_progress = False
+ feature.assigned_agent_id = None
+ session.commit()
+ session.refresh(feature)
+
+ return json.dumps({
+ "released": True,
+ "feature": feature.to_dict(),
+ "message": f"Feature '{feature.name}' released back to queue"
+ }, indent=2)
+ finally:
+ session.close()
+
+
@mcp.tool()
def feature_create_bulk(
features: Annotated[list[dict], Field(description="List of features to create, each with category, name, description, and steps")]
diff --git a/parallel_agent_runner.py b/parallel_agent_runner.py
new file mode 100644
index 00000000..348b1b85
--- /dev/null
+++ b/parallel_agent_runner.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+"""
+Parallel Agent Runner
+=====================
+
+Runs a single agent instance as part of a parallel agent pool.
+Each agent runs in its own worktree and identifies itself with an agent_id.
+
+This script is spawned by the ParallelAgentOrchestrator for each agent.
+"""
+
+import argparse
+import asyncio
+import os
+from pathlib import Path
+
+from dotenv import load_dotenv
+
+load_dotenv()
+
+from agent import run_autonomous_agent
+from registry import get_project_path
+
+
+DEFAULT_MODEL = "claude-sonnet-4-5-20250929"
+
+
+def parse_args() -> argparse.Namespace:
+ """Parse command line arguments."""
+ parser = argparse.ArgumentParser(
+ description="Parallel Agent Runner - Single agent in parallel pool",
+ )
+
+ parser.add_argument(
+ "--project-dir",
+ type=str,
+ required=True,
+ help="Main project directory (for database access)",
+ )
+
+ parser.add_argument(
+ "--worktree-dir",
+ type=str,
+ required=True,
+ help="Git worktree directory (agent's working directory)",
+ )
+
+ parser.add_argument(
+ "--agent-id",
+ type=str,
+ required=True,
+ help="Unique identifier for this agent",
+ )
+
+ parser.add_argument(
+ "--model",
+ type=str,
+ default=DEFAULT_MODEL,
+ help=f"Claude model to use (default: {DEFAULT_MODEL})",
+ )
+
+ parser.add_argument(
+ "--max-iterations",
+ type=int,
+ default=None,
+ help="Maximum iterations (default: unlimited)",
+ )
+
+ parser.add_argument(
+ "--yolo",
+ action="store_true",
+ default=False,
+ help="Enable YOLO mode (no browser testing)",
+ )
+
+ return parser.parse_args()
+
+
+def main() -> None:
+ """Main entry point."""
+ args = parse_args()
+
+ project_dir = Path(args.project_dir).resolve()
+ worktree_dir = Path(args.worktree_dir).resolve()
+
+ if not project_dir.exists():
+ print(f"Error: Project directory does not exist: {project_dir}")
+ return
+
+ if not worktree_dir.exists():
+ print(f"Error: Worktree directory does not exist: {worktree_dir}")
+ return
+
+ print(f"[{args.agent_id}] Starting agent")
+ print(f"[{args.agent_id}] Project dir: {project_dir}")
+ print(f"[{args.agent_id}] Worktree dir: {worktree_dir}")
+
+ # Set environment variable for MCP server to use main project dir for database
+ os.environ["PROJECT_DIR"] = str(project_dir)
+ os.environ["AGENT_ID"] = args.agent_id
+
+ try:
+ asyncio.run(
+ run_autonomous_agent(
+ project_dir=worktree_dir, # Agent works in worktree
+ model=args.model,
+ max_iterations=args.max_iterations,
+ yolo_mode=args.yolo,
+ agent_id=args.agent_id, # Pass agent_id to agent
+ )
+ )
+ except KeyboardInterrupt:
+ print(f"\n\n[{args.agent_id}] Interrupted by user")
+ except Exception as e:
+ print(f"\n[{args.agent_id}] Fatal error: {e}")
+ raise
+
+
+if __name__ == "__main__":
+ main()
diff --git a/parallel_agents.py b/parallel_agents.py
new file mode 100644
index 00000000..079aaa20
--- /dev/null
+++ b/parallel_agents.py
@@ -0,0 +1,417 @@
+"""
+Parallel Agent Orchestrator
+===========================
+
+Manages multiple Claude agents working in parallel on the same project.
+Each agent gets its own git worktree for isolation while sharing
+the same feature database.
+"""
+
+import asyncio
+import logging
+import subprocess
+import sys
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Callable, Awaitable, Literal, Optional
+
+import psutil
+
+from worktree import WorktreeManager
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class AgentInfo:
+ """Information about a running agent."""
+ agent_id: str
+ process: Optional[subprocess.Popen] = None
+ worktree_path: Optional[Path] = None
+ status: Literal["stopped", "running", "paused", "crashed"] = "stopped"
+ started_at: Optional[datetime] = None
+ output_task: Optional[asyncio.Task] = None
+
+
+class ParallelAgentOrchestrator:
+ """
+ Orchestrates multiple parallel agents for a project.
+
+ Each agent:
+ - Gets its own git worktree for code isolation
+ - Shares the same features.db for task coordination
+ - Has an independent Claude session
+ """
+
+ def __init__(
+ self,
+ project_dir: Path,
+ root_dir: Path,
+ max_agents: int = 10,
+ ):
+ """
+ Initialize the orchestrator.
+
+ Args:
+ project_dir: The main project directory
+ root_dir: Root directory of autocoder (for scripts)
+ max_agents: Maximum number of parallel agents
+ """
+ self.project_dir = project_dir.resolve()
+ self.root_dir = root_dir
+ self.max_agents = max_agents
+ self.worktree_manager = WorktreeManager(project_dir)
+
+ # Track running agents
+ self.agents: dict[str, AgentInfo] = {}
+
+ # Callbacks for output streaming
+ self._output_callbacks: list[Callable[[str, str], Awaitable[None]]] = []
+ self._status_callbacks: list[Callable[[str, str], Awaitable[None]]] = []
+
+ def add_output_callback(self, callback: Callable[[str, str], Awaitable[None]]) -> None:
+ """Add callback for agent output (agent_id, line)."""
+ self._output_callbacks.append(callback)
+
+ def add_status_callback(self, callback: Callable[[str, str], Awaitable[None]]) -> None:
+ """Add callback for status changes (agent_id, status)."""
+ self._status_callbacks.append(callback)
+
+ async def _notify_output(self, agent_id: str, line: str) -> None:
+ """Notify callbacks of agent output."""
+ for callback in self._output_callbacks:
+ try:
+ await callback(agent_id, line)
+ except Exception as e:
+ logger.warning(f"Output callback error: {e}")
+
+ async def _notify_status(self, agent_id: str, status: str) -> None:
+ """Notify callbacks of status change."""
+ for callback in self._status_callbacks:
+ try:
+ await callback(agent_id, status)
+ except Exception as e:
+ logger.warning(f"Status callback error: {e}")
+
+ def generate_agent_id(self, index: int) -> str:
+ """Generate a unique agent ID."""
+ return f"agent-{index + 1}"
+
+ async def start_agents(
+ self,
+ num_agents: int,
+ yolo_mode: bool = False,
+ model: str = "claude-sonnet-4-5-20250929",
+ max_iterations: Optional[int] = None,
+ ) -> dict[str, bool]:
+ """
+ Start multiple agents in parallel.
+
+ Args:
+ num_agents: Number of agents to start (capped at max_agents)
+ yolo_mode: Enable YOLO mode (no browser testing)
+ model: Claude model to use
+ max_iterations: Maximum iterations per agent (default: unlimited)
+
+ Returns:
+ Dict mapping agent_id to success status
+ """
+ num_agents = min(num_agents, self.max_agents)
+ results = {}
+
+ # Ensure git repo is initialized
+ if not self.worktree_manager.ensure_git_repo():
+ logger.error("Failed to initialize git repository")
+ return {self.generate_agent_id(i): False for i in range(num_agents)}
+
+ # Start agents concurrently for faster initialization
+ agent_ids = [self.generate_agent_id(i) for i in range(num_agents)]
+ tasks = [self.start_agent(agent_id, yolo_mode, model, max_iterations) for agent_id in agent_ids]
+
+ # Gather results, allowing individual failures
+ start_results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ for agent_id, result in zip(agent_ids, start_results):
+ if isinstance(result, Exception):
+ logger.error(f"Failed to start {agent_id}: {result}")
+ results[agent_id] = False
+ else:
+ results[agent_id] = result
+
+ return results
+
+ async def start_agent(
+ self,
+ agent_id: str,
+ yolo_mode: bool = False,
+ model: str = "claude-sonnet-4-5-20250929",
+ max_iterations: Optional[int] = None,
+ ) -> bool:
+ """
+ Start a single agent.
+
+ Args:
+ agent_id: Unique agent identifier
+ yolo_mode: Enable YOLO mode
+ model: Claude model to use
+ max_iterations: Maximum iterations (default: unlimited)
+
+ Returns:
+ True if started successfully
+ """
+ if agent_id in self.agents and self.agents[agent_id].status == "running":
+ logger.warning(f"Agent {agent_id} is already running")
+ return False
+
+ # Create worktree for this agent
+ worktree_path = self.worktree_manager.create_worktree(agent_id)
+ if worktree_path is None:
+ logger.error(f"Failed to create worktree for agent {agent_id}")
+ return False
+
+ # Build command - use the parallel agent script
+ cmd = [
+ sys.executable,
+ str(self.root_dir / "parallel_agent_runner.py"),
+ "--project-dir", str(self.project_dir),
+ "--worktree-dir", str(worktree_path),
+ "--agent-id", agent_id,
+ "--model", model,
+ ]
+
+ if yolo_mode:
+ cmd.append("--yolo")
+
+ if max_iterations is not None:
+ cmd.extend(["--max-iterations", str(max_iterations)])
+
+ try:
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ cwd=str(worktree_path),
+ )
+
+ agent_info = AgentInfo(
+ agent_id=agent_id,
+ process=process,
+ worktree_path=worktree_path,
+ status="running",
+ started_at=datetime.now(),
+ )
+ self.agents[agent_id] = agent_info
+
+ # Start output streaming
+ agent_info.output_task = asyncio.create_task(
+ self._stream_output(agent_id)
+ )
+
+ await self._notify_status(agent_id, "running")
+ logger.info(f"Started agent {agent_id} (PID {process.pid})")
+ return True
+
+ except Exception as e:
+ logger.exception(f"Failed to start agent {agent_id}")
+ return False
+
+ async def _stream_output(self, agent_id: str) -> None:
+ """Stream output from an agent process."""
+ agent = self.agents.get(agent_id)
+ if not agent or not agent.process or not agent.process.stdout:
+ return
+
+ try:
+ loop = asyncio.get_running_loop()
+ while True:
+ line = await loop.run_in_executor(
+ None, agent.process.stdout.readline
+ )
+ if not line:
+ break
+
+ decoded = line.decode("utf-8", errors="replace").rstrip()
+ await self._notify_output(agent_id, decoded)
+
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ logger.warning(f"Output streaming error for {agent_id}: {e}")
+ finally:
+ # Check if process ended
+ if agent.process and agent.process.poll() is not None:
+ exit_code = agent.process.returncode
+ if exit_code != 0 and agent.status == "running":
+ agent.status = "crashed"
+ elif agent.status == "running":
+ agent.status = "stopped"
+ await self._notify_status(agent_id, agent.status)
+
+ async def stop_agent(self, agent_id: str) -> bool:
+ """Stop a single agent."""
+ agent = self.agents.get(agent_id)
+ if not agent or not agent.process:
+ return False
+
+ try:
+ # Cancel output streaming
+ if agent.output_task:
+ agent.output_task.cancel()
+ try:
+ await agent.output_task
+ except asyncio.CancelledError:
+ pass
+
+ # Terminate process
+ agent.process.terminate()
+
+ loop = asyncio.get_running_loop()
+ try:
+ await asyncio.wait_for(
+ loop.run_in_executor(None, agent.process.wait),
+ timeout=5.0
+ )
+ except asyncio.TimeoutError:
+ agent.process.kill()
+ await loop.run_in_executor(None, agent.process.wait)
+
+ agent.status = "stopped"
+ agent.process = None
+ await self._notify_status(agent_id, "stopped")
+
+ logger.info(f"Stopped agent {agent_id}")
+ return True
+
+ except Exception as e:
+ logger.exception(f"Failed to stop agent {agent_id}")
+ return False
+
+ async def stop_all_agents(self) -> None:
+ """Stop all running agents."""
+ tasks = []
+ for agent_id in list(self.agents.keys()):
+ if self.agents[agent_id].status == "running":
+ tasks.append(self.stop_agent(agent_id))
+
+ if tasks:
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ async def pause_agent(self, agent_id: str) -> bool:
+ """
+ Pause an agent using psutil suspend (SIGSTOP on Unix).
+
+ WARNING: This uses SIGSTOP/SIGCONT which has known limitations:
+ - The entire process and asyncio event loop are frozen
+ - On resume, pending signals, timeouts, and callbacks fire in a burst
+ - Outstanding async operations (API requests, I/O) may timeout or fail
+ - Child watcher and signal handling may have unpredictable behavior
+
+ For production use, consider stop/start semantics instead if the agent
+ has long-running async operations that could be interrupted.
+ """
+ agent = self.agents.get(agent_id)
+ if not agent or not agent.process or agent.status != "running":
+ return False
+
+ try:
+ proc = psutil.Process(agent.process.pid)
+ proc.suspend()
+ agent.status = "paused"
+ await self._notify_status(agent_id, "paused")
+ return True
+ except Exception as e:
+ logger.exception(f"Failed to pause agent {agent_id}")
+ return False
+
+ async def resume_agent(self, agent_id: str) -> bool:
+ """
+ Resume a paused agent using psutil resume (SIGCONT on Unix).
+
+ WARNING: See pause_agent docstring for important caveats about
+ SIGSTOP/SIGCONT and asyncio. After resume, the agent may experience:
+ - Burst of delayed timer/callback executions
+ - Potential connection timeouts or stale state
+ - Signal delivery ordering issues
+ """
+ agent = self.agents.get(agent_id)
+ if not agent or not agent.process or agent.status != "paused":
+ return False
+
+ try:
+ proc = psutil.Process(agent.process.pid)
+ proc.resume()
+ agent.status = "running"
+ await self._notify_status(agent_id, "running")
+ return True
+ except Exception as e:
+ logger.exception(f"Failed to resume agent {agent_id}")
+ return False
+
+ def get_agent_status(self, agent_id: str) -> dict:
+ """Get status of a single agent."""
+ agent = self.agents.get(agent_id)
+ if not agent:
+ return {"agent_id": agent_id, "status": "unknown"}
+
+ return {
+ "agent_id": agent_id,
+ "status": agent.status,
+ "pid": agent.process.pid if agent.process else None,
+ "started_at": agent.started_at.isoformat() if agent.started_at else None,
+ "worktree_path": str(agent.worktree_path) if agent.worktree_path else None,
+ }
+
+ def get_all_statuses(self) -> list[dict]:
+ """Get status of all agents."""
+ return [self.get_agent_status(aid) for aid in self.agents]
+
+ async def healthcheck(self) -> dict[str, bool]:
+ """Check health of all agents."""
+ results = {}
+ for agent_id, agent in self.agents.items():
+ if not agent.process:
+ results[agent_id] = agent.status == "stopped"
+ continue
+
+ poll = agent.process.poll()
+ if poll is not None:
+ if agent.status in ("running", "paused"):
+ agent.status = "crashed"
+ await self._notify_status(agent_id, "crashed")
+ results[agent_id] = False
+ else:
+ results[agent_id] = True
+
+ return results
+
+ async def merge_all_worktrees(self) -> dict[str, bool]:
+ """Merge changes from all agent worktrees back to main."""
+ results = {}
+ for agent_id in self.agents:
+ success = self.worktree_manager.merge_worktree_changes(agent_id)
+ results[agent_id] = success
+ return results
+
+ async def cleanup(self) -> None:
+ """Stop all agents and clean up worktrees."""
+ await self.stop_all_agents()
+ self.worktree_manager.cleanup_all_worktrees()
+
+
+# Global orchestrator registry
+_orchestrators: dict[str, ParallelAgentOrchestrator] = {}
+
+
+def get_orchestrator(
+ project_name: str,
+ project_dir: Path,
+ root_dir: Path,
+ max_agents: int = 3,
+) -> ParallelAgentOrchestrator:
+ """Get or create an orchestrator for a project."""
+ if project_name not in _orchestrators:
+ _orchestrators[project_name] = ParallelAgentOrchestrator(
+ project_dir, root_dir, max_agents
+ )
+ return _orchestrators[project_name]
diff --git a/server/main.py b/server/main.py
index f48e9f2e..37f33952 100644
--- a/server/main.py
+++ b/server/main.py
@@ -20,6 +20,7 @@
assistant_chat_router,
features_router,
filesystem_router,
+ parallel_agents_router,
projects_router,
spec_creation_router,
)
@@ -89,6 +90,7 @@ async def require_localhost(request: Request, call_next):
app.include_router(projects_router)
app.include_router(features_router)
app.include_router(agent_router)
+app.include_router(parallel_agents_router) # Parallel agent management
app.include_router(spec_creation_router)
app.include_router(filesystem_router)
app.include_router(assistant_chat_router)
diff --git a/server/routers/__init__.py b/server/routers/__init__.py
index 48b4f804..3d9bfd5a 100644
--- a/server/routers/__init__.py
+++ b/server/routers/__init__.py
@@ -9,6 +9,7 @@
from .assistant_chat import router as assistant_chat_router
from .features import router as features_router
from .filesystem import router as filesystem_router
+from .parallel_agents import router as parallel_agents_router
from .projects import router as projects_router
from .spec_creation import router as spec_creation_router
@@ -16,6 +17,7 @@
"projects_router",
"features_router",
"agent_router",
+ "parallel_agents_router",
"spec_creation_router",
"filesystem_router",
"assistant_chat_router",
diff --git a/server/routers/features.py b/server/routers/features.py
index 3329a68f..0d463487 100644
--- a/server/routers/features.py
+++ b/server/routers/features.py
@@ -90,6 +90,7 @@ def feature_to_response(f) -> FeatureResponse:
steps=f.steps if isinstance(f.steps, list) else [],
passes=f.passes,
in_progress=f.in_progress,
+ assigned_agent_id=f.assigned_agent_id,
)
diff --git a/server/routers/parallel_agents.py b/server/routers/parallel_agents.py
new file mode 100644
index 00000000..899356af
--- /dev/null
+++ b/server/routers/parallel_agents.py
@@ -0,0 +1,274 @@
+"""
+Parallel Agents Router
+======================
+
+API endpoints for parallel agent control (start/stop multiple agents).
+Uses git worktrees for agent isolation.
+"""
+
+import re
+from pathlib import Path
+
+from fastapi import APIRouter, HTTPException
+
+from ..schemas import (
+ ParallelAgentStartRequest,
+ ParallelAgentsStatus,
+ ParallelAgentInfo,
+ ParallelAgentActionResponse,
+)
+
+
+def _get_project_path(project_name: str) -> Path:
+ """Get project path from registry."""
+ import sys
+ root = Path(__file__).parent.parent.parent
+ if str(root) not in sys.path:
+ sys.path.insert(0, str(root))
+
+ from registry import get_project_path
+ return get_project_path(project_name)
+
+
+router = APIRouter(prefix="/api/projects/{project_name}/parallel-agents", tags=["parallel-agents"])
+
+# Root directory for autocoder
+ROOT_DIR = Path(__file__).parent.parent.parent
+
+
+def validate_project_name(name: str) -> str:
+ """Validate and sanitize project name to prevent path traversal."""
+ if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid project name"
+ )
+ return name
+
+
+def get_orchestrator(project_name: str):
+ """Get or create the parallel agent orchestrator for a project."""
+ from parallel_agents import get_orchestrator as _get_orchestrator
+
+ project_name = validate_project_name(project_name)
+ project_dir = _get_project_path(project_name)
+
+ if not project_dir:
+ raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry")
+
+ if not project_dir.exists():
+ raise HTTPException(status_code=404, detail=f"Project directory not found: {project_dir}")
+
+ return _get_orchestrator(project_name, project_dir, ROOT_DIR)
+
+
+@router.get("/status", response_model=ParallelAgentsStatus)
+async def get_parallel_agents_status(project_name: str):
+ """Get the status of all parallel agents for a project."""
+ orchestrator = get_orchestrator(project_name)
+
+ # Run healthcheck
+ await orchestrator.healthcheck()
+
+ agents = []
+ running_count = 0
+
+ for agent_id, agent in orchestrator.agents.items():
+ agents.append(ParallelAgentInfo(
+ agent_id=agent_id,
+ status=agent.status,
+ pid=agent.process.pid if agent.process else None,
+ started_at=agent.started_at,
+ worktree_path=str(agent.worktree_path) if agent.worktree_path else None,
+ ))
+ if agent.status == "running":
+ running_count += 1
+
+ return ParallelAgentsStatus(
+ agents=agents,
+ total_running=running_count,
+ max_agents=orchestrator.max_agents,
+ )
+
+
+@router.post("/start", response_model=ParallelAgentActionResponse)
+async def start_parallel_agents(
+ project_name: str,
+ request: ParallelAgentStartRequest = ParallelAgentStartRequest(),
+):
+ """Start multiple parallel agents for a project."""
+ orchestrator = get_orchestrator(project_name)
+
+ results = await orchestrator.start_agents(
+ num_agents=request.num_agents,
+ yolo_mode=request.yolo_mode,
+ max_iterations=request.max_iterations,
+ )
+
+ all_success = all(results.values())
+ success_count = sum(1 for v in results.values() if v)
+
+ return ParallelAgentActionResponse(
+ success=all_success,
+ agents=results,
+ message=f"Started {success_count}/{request.num_agents} agents",
+ )
+
+
+@router.post("/stop", response_model=ParallelAgentActionResponse)
+async def stop_all_parallel_agents(project_name: str):
+ """Stop all parallel agents for a project."""
+ orchestrator = get_orchestrator(project_name)
+
+ # Get list of running agents before stopping
+ running_agents = [
+ aid for aid, agent in orchestrator.agents.items()
+ if agent.status == "running"
+ ]
+
+ await orchestrator.stop_all_agents()
+
+ # Build results dict
+ results = {aid: True for aid in running_agents}
+
+ return ParallelAgentActionResponse(
+ success=True,
+ agents=results,
+ message=f"Stopped {len(running_agents)} agents",
+ )
+
+
+@router.post("/pause", response_model=ParallelAgentActionResponse)
+async def pause_all_parallel_agents(project_name: str):
+ """Pause all running parallel agents for a project."""
+ orchestrator = get_orchestrator(project_name)
+
+ running_agents = [
+ aid for aid, agent in orchestrator.agents.items()
+ if agent.status == "running"
+ ]
+
+ results = {}
+ for aid in running_agents:
+ success = await orchestrator.pause_agent(aid)
+ results[aid] = success
+
+ paused_count = sum(1 for v in results.values() if v)
+
+ return ParallelAgentActionResponse(
+ success=paused_count == len(running_agents),
+ agents=results,
+ message=f"Paused {paused_count}/{len(running_agents)} agents",
+ )
+
+
+@router.post("/resume", response_model=ParallelAgentActionResponse)
+async def resume_all_parallel_agents(project_name: str):
+ """Resume all paused parallel agents for a project."""
+ orchestrator = get_orchestrator(project_name)
+
+ paused_agents = [
+ aid for aid, agent in orchestrator.agents.items()
+ if agent.status == "paused"
+ ]
+
+ results = {}
+ for aid in paused_agents:
+ success = await orchestrator.resume_agent(aid)
+ results[aid] = success
+
+ resumed_count = sum(1 for v in results.values() if v)
+
+ return ParallelAgentActionResponse(
+ success=resumed_count == len(paused_agents),
+ agents=results,
+ message=f"Resumed {resumed_count}/{len(paused_agents)} agents",
+ )
+
+
+@router.post("/{agent_id}/start")
+async def start_single_agent(
+ project_name: str,
+ agent_id: str,
+ yolo_mode: bool = False,
+):
+ """Start a single parallel agent."""
+ orchestrator = get_orchestrator(project_name)
+
+ success = await orchestrator.start_agent(agent_id, yolo_mode=yolo_mode)
+
+ agent = orchestrator.agents.get(agent_id)
+ return {
+ "success": success,
+ "agent_id": agent_id,
+ "status": agent.status if agent else "unknown",
+ }
+
+
+@router.post("/{agent_id}/stop")
+async def stop_single_agent(project_name: str, agent_id: str):
+ """Stop a single parallel agent."""
+ orchestrator = get_orchestrator(project_name)
+
+ success = await orchestrator.stop_agent(agent_id)
+
+ return {
+ "success": success,
+ "agent_id": agent_id,
+ "status": "stopped" if success else "unknown",
+ }
+
+
+@router.post("/{agent_id}/pause")
+async def pause_single_agent(project_name: str, agent_id: str):
+ """Pause a single parallel agent."""
+ orchestrator = get_orchestrator(project_name)
+
+ success = await orchestrator.pause_agent(agent_id)
+
+ return {
+ "success": success,
+ "agent_id": agent_id,
+ "status": "paused" if success else "unknown",
+ }
+
+
+@router.post("/{agent_id}/resume")
+async def resume_single_agent(project_name: str, agent_id: str):
+ """Resume a single parallel agent."""
+ orchestrator = get_orchestrator(project_name)
+
+ success = await orchestrator.resume_agent(agent_id)
+
+ return {
+ "success": success,
+ "agent_id": agent_id,
+ "status": "running" if success else "unknown",
+ }
+
+
+@router.post("/merge")
+async def merge_all_worktrees(project_name: str):
+ """Merge changes from all agent worktrees back to main branch."""
+ orchestrator = get_orchestrator(project_name)
+
+ results = await orchestrator.merge_all_worktrees()
+
+ return {
+ "success": all(results.values()),
+ "agents": results,
+ "message": f"Merged {sum(1 for v in results.values() if v)}/{len(results)} worktrees",
+ }
+
+
+@router.post("/cleanup")
+async def cleanup_parallel_agents(project_name: str):
+ """Stop all agents and clean up worktrees."""
+ orchestrator = get_orchestrator(project_name)
+
+ await orchestrator.cleanup()
+
+ return {
+ "success": True,
+ "message": "All agents stopped and worktrees cleaned up",
+ }
diff --git a/server/routers/projects.py b/server/routers/projects.py
index d1c2b6c7..69b7d12c 100644
--- a/server/routers/projects.py
+++ b/server/routers/projects.py
@@ -277,6 +277,7 @@ async def get_project_prompts(name: str):
prompts_dir = _get_project_prompts_dir(project_dir)
def read_file(filename: str) -> str:
+ """Read a prompt file, returning empty string if not found."""
filepath = prompts_dir / filename
if filepath.exists():
try:
@@ -311,6 +312,7 @@ async def update_project_prompts(name: str, prompts: ProjectPromptsUpdate):
prompts_dir.mkdir(parents=True, exist_ok=True)
def write_file(filename: str, content: str | None):
+ """Write content to a prompt file if content is not None."""
if content is not None:
filepath = prompts_dir / filename
filepath.write_text(content, encoding="utf-8")
diff --git a/server/schemas.py b/server/schemas.py
index 5531a448..0ea685fc 100644
--- a/server/schemas.py
+++ b/server/schemas.py
@@ -84,6 +84,7 @@ class FeatureResponse(FeatureBase):
priority: int
passes: bool
in_progress: bool
+ assigned_agent_id: str | None = None # Agent working on this feature
class Config:
from_attributes = True
@@ -120,6 +121,40 @@ class AgentActionResponse(BaseModel):
message: str = ""
+# ============================================================================
+# Parallel Agent Schemas
+# ============================================================================
+
+class ParallelAgentStartRequest(BaseModel):
+ """Request schema for starting parallel agents."""
+ num_agents: int = Field(default=2, ge=1, le=10, description="Number of agents to start")
+ yolo_mode: bool = False
+ max_iterations: int | None = Field(default=None, ge=1, description="Maximum iterations per agent")
+
+
+class ParallelAgentInfo(BaseModel):
+ """Information about a single parallel agent."""
+ agent_id: str
+ status: Literal["stopped", "running", "paused", "crashed", "unknown"]
+ pid: int | None = None
+ started_at: datetime | None = None
+ worktree_path: str | None = None
+
+
+class ParallelAgentsStatus(BaseModel):
+ """Status of all parallel agents."""
+ agents: list[ParallelAgentInfo]
+ total_running: int
+ max_agents: int
+
+
+class ParallelAgentActionResponse(BaseModel):
+ """Response for parallel agent control actions."""
+ success: bool
+ agents: dict[str, bool] # agent_id -> success
+ message: str = ""
+
+
# ============================================================================
# Setup Schemas
# ============================================================================
diff --git a/server/services/process_manager.py b/server/services/process_manager.py
index d2b4f0b3..8adade13 100644
--- a/server/services/process_manager.py
+++ b/server/services/process_manager.py
@@ -85,10 +85,12 @@ def __init__(
@property
def status(self) -> Literal["stopped", "running", "paused", "crashed"]:
+ """Get the current agent process status."""
return self._status
@status.setter
def status(self, value: Literal["stopped", "running", "paused", "crashed"]):
+ """Set status and notify callbacks if changed."""
old_status = self._status
self._status = value
if old_status != value:
@@ -137,6 +139,7 @@ def remove_status_callback(self, callback: Callable[[str], Awaitable[None]]) ->
@property
def pid(self) -> int | None:
+ """Get the process ID of the running agent, or None if not running."""
return self.process.pid if self.process else None
def _check_lock(self) -> bool:
diff --git a/server/websocket.py b/server/websocket.py
index 23139009..9437b477 100644
--- a/server/websocket.py
+++ b/server/websocket.py
@@ -51,6 +51,7 @@ class ConnectionManager:
"""Manages WebSocket connections per project."""
def __init__(self):
+ """Initialize the connection manager with empty connection registry."""
# project_name -> set of WebSocket connections
self.active_connections: dict[str, Set[WebSocket]] = {}
self._lock = asyncio.Lock()
diff --git a/ui/src/App.tsx b/ui/src/App.tsx
index 794c5a2e..3ac7614d 100644
--- a/ui/src/App.tsx
+++ b/ui/src/App.tsx
@@ -8,6 +8,7 @@ const STORAGE_KEY = 'autocoder-selected-project'
import { ProjectSelector } from './components/ProjectSelector'
import { KanbanBoard } from './components/KanbanBoard'
import { AgentControl } from './components/AgentControl'
+import { ParallelAgentControl } from './components/ParallelAgentControl'
import { ProgressDashboard } from './components/ProgressDashboard'
import { SetupWizard } from './components/SetupWizard'
import { AddFeatureForm } from './components/AddFeatureForm'
@@ -34,6 +35,7 @@ function App() {
const [debugOpen, setDebugOpen] = useState(false)
const [debugPanelHeight, setDebugPanelHeight] = useState(288) // Default height
const [assistantOpen, setAssistantOpen] = useState(false)
+ const [parallelMode, setParallelMode] = useState(false)
const { data: projects, isLoading: projectsLoading } = useProjects()
const { data: features } = useFeatures(selectedProject)
@@ -160,10 +162,17 @@ function App() {
-