From 4cab943b375cd00252d62d552ba1b9d160859827 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Tue, 6 Jan 2026 15:35:00 -0500 Subject: [PATCH 1/7] feat: Add parallel subagent execution support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the ability to run multiple Claude agents in parallel to speed up project completion (GitHub issue #5). ## Key Features: - **Git worktrees**: Each agent gets isolated working directory - **Atomic task claiming**: Prevents race conditions between agents - **Agent assignment tracking**: Features show which agent is working on them - **Configurable parallelism**: --num-agents flag for CLI, API for UI ## New Files: - worktree.py: Git worktree management for agent isolation - parallel_agents.py: Orchestrator for managing multiple agents - parallel_agent_runner.py: Individual agent process runner - server/routers/parallel_agents.py: REST API for parallel agent control ## Changes: - Database schema: Added assigned_agent_id column to features - MCP server: New atomic feature_claim_next() tool - UI: Agent badges on feature cards - CLI: --num-agents option for parallel execution Closes #5 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 36 +++ agent.py | 25 +- api/database.py | 20 +- autonomous_agent_demo.py | 99 ++++++-- client.py | 33 ++- mcp_server/feature_mcp.py | 150 ++++++++++-- parallel_agent_runner.py | 120 ++++++++++ parallel_agents.py | 383 ++++++++++++++++++++++++++++++ server/main.py | 3 +- server/routers/__init__.py | 2 + server/routers/parallel_agents.py | 224 +++++++++++++++++ server/schemas.py | 34 +++ ui/src/components/FeatureCard.tsx | 71 ++++-- ui/src/lib/types.ts | 22 ++ ui/tsconfig.tsbuildinfo | 2 +- worktree.py | 306 ++++++++++++++++++++++++ 16 files changed, 1462 insertions(+), 68 deletions(-) create mode 100644 parallel_agent_runner.py create mode 100644 parallel_agents.py create mode 100644 server/routers/parallel_agents.py create mode 100644 worktree.py diff --git a/CLAUDE.md b/CLAUDE.md index 51c09493..43aa700e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -71,6 +71,42 @@ python autonomous_agent_demo.py --project-dir my-app --yolo **When to use:** Early prototyping when you want to quickly scaffold features without verification overhead. Switch back to standard mode for production-quality development. +### Parallel Agents Mode + +Run multiple agents simultaneously to speed up project completion: + +```bash +# CLI: Run 3 agents in parallel +python autonomous_agent_demo.py --project-dir my-app --num-agents 3 + +# Combine with YOLO mode for fastest iteration +python autonomous_agent_demo.py --project-dir my-app --num-agents 3 --yolo + +# API: Start parallel agents via REST +POST /api/projects/{project_name}/parallel-agents/start +Body: { "num_agents": 3, "yolo_mode": false } +``` + +**How it works:** +- Each agent gets its own **git worktree** for code isolation +- All agents share the same **features.db** for task coordination +- Features are **atomically claimed** to prevent conflicts +- Agent assignment is tracked via `assigned_agent_id` on features + +**API Endpoints:** +- `GET /api/projects/{name}/parallel-agents/status` - Get status of all agents +- `POST /api/projects/{name}/parallel-agents/start` - Start N agents +- `POST /api/projects/{name}/parallel-agents/stop` - Stop all agents +- `POST /api/projects/{name}/parallel-agents/merge` - Merge worktree changes +- `POST /api/projects/{name}/parallel-agents/cleanup` - Stop and cleanup + +**MCP Tools for Parallel Agents:** +- `feature_claim_next(agent_id)` - Atomically claim next available feature +- `feature_release(feature_id, agent_id)` - Release feature back to queue +- `feature_get_next(agent_id)` - Get next feature (excludes others' assignments) + +**When to use:** When you have many independent features and want to parallelize implementation. Best combined with YOLO mode for maximum speed. + ### React UI (in ui/ directory) ```bash diff --git a/agent.py b/agent.py index 7b6ef874..d5515bfe 100644 --- a/agent.py +++ b/agent.py @@ -113,6 +113,7 @@ async def run_autonomous_agent( model: str, max_iterations: Optional[int] = None, yolo_mode: bool = False, + agent_id: Optional[str] = None, ) -> None: """ Run the autonomous agent loop. @@ -122,20 +123,28 @@ async def run_autonomous_agent( model: Claude model to use max_iterations: Maximum number of iterations (None for unlimited) yolo_mode: If True, skip browser testing and use YOLO prompt + agent_id: Optional agent identifier for parallel execution """ + prefix = f"[{agent_id}] " if agent_id else "" + print("\n" + "=" * 70) - print(" AUTONOMOUS CODING AGENT DEMO") + if agent_id: + print(f" AUTONOMOUS CODING AGENT - {agent_id}") + else: + print(" AUTONOMOUS CODING AGENT DEMO") print("=" * 70) - print(f"\nProject directory: {project_dir}") - print(f"Model: {model}") + print(f"\n{prefix}Project directory: {project_dir}") + print(f"{prefix}Model: {model}") + if agent_id: + print(f"{prefix}Agent ID: {agent_id}") if yolo_mode: - print("Mode: YOLO (testing disabled)") + print(f"{prefix}Mode: YOLO (testing disabled)") else: - print("Mode: Standard (full testing)") + print(f"{prefix}Mode: Standard (full testing)") if max_iterations: - print(f"Max iterations: {max_iterations}") + print(f"{prefix}Max iterations: {max_iterations}") else: - print("Max iterations: Unlimited (will run until completion)") + print(f"{prefix}Max iterations: Unlimited (will run until completion)") print() # Create project directory @@ -177,7 +186,7 @@ async def run_autonomous_agent( print_session_header(iteration, is_first_run) # Create client (fresh context) - client = create_client(project_dir, model, yolo_mode=yolo_mode) + client = create_client(project_dir, model, yolo_mode=yolo_mode, agent_id=agent_id) # Choose prompt based on session type # Pass project_dir to enable project-specific prompts diff --git a/api/database.py b/api/database.py index f05f92fc..d430caf7 100644 --- a/api/database.py +++ b/api/database.py @@ -29,6 +29,7 @@ class Feature(Base): steps = Column(JSON, nullable=False) # Stored as JSON array passes = Column(Boolean, default=False, index=True) in_progress = Column(Boolean, default=False, index=True) + assigned_agent_id = Column(String(50), nullable=True, index=True) # Agent working on this feature def to_dict(self) -> dict: """Convert feature to dictionary for JSON serialization.""" @@ -41,6 +42,7 @@ def to_dict(self) -> dict: "steps": self.steps, "passes": self.passes, "in_progress": self.in_progress, + "assigned_agent_id": self.assigned_agent_id, } @@ -73,6 +75,21 @@ def _migrate_add_in_progress_column(engine) -> None: conn.commit() +def _migrate_add_assigned_agent_id_column(engine) -> None: + """Add assigned_agent_id column to existing databases that don't have it.""" + from sqlalchemy import text + + with engine.connect() as conn: + # Check if column exists + result = conn.execute(text("PRAGMA table_info(features)")) + columns = [row[1] for row in result.fetchall()] + + if "assigned_agent_id" not in columns: + # Add the column (nullable, no default) + conn.execute(text("ALTER TABLE features ADD COLUMN assigned_agent_id VARCHAR(50)")) + conn.commit() + + def create_database(project_dir: Path) -> tuple: """ Create database and return engine + session maker. @@ -87,8 +104,9 @@ def create_database(project_dir: Path) -> tuple: engine = create_engine(db_url, connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) - # Migrate existing databases to add in_progress column + # Migrate existing databases to add new columns _migrate_add_in_progress_column(engine) + _migrate_add_assigned_agent_id_column(engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) return engine, SessionLocal diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index caa24916..fd2f9529 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -19,6 +19,9 @@ # YOLO mode: rapid prototyping without browser testing python autonomous_agent_demo.py --project-dir my-app --yolo + + # Parallel agents: run multiple agents simultaneously + python autonomous_agent_demo.py --project-dir my-app --num-agents 3 """ import argparse @@ -97,6 +100,13 @@ def parse_args() -> argparse.Namespace: help="Enable YOLO mode: rapid prototyping without browser testing", ) + parser.add_argument( + "--num-agents", + type=int, + default=1, + help="Number of parallel agents to run (default: 1, max: 10)", + ) + return parser.parse_args() @@ -128,22 +138,81 @@ def main() -> None: print("Use an absolute path or register the project first.") return - try: - # Run the agent (MCP server handles feature database) - asyncio.run( - run_autonomous_agent( - project_dir=project_dir, - model=args.model, - max_iterations=args.max_iterations, - yolo_mode=args.yolo, - ) + # Check if parallel mode requested + num_agents = min(args.num_agents, 10) # Cap at 10 agents + + if num_agents > 1: + # Parallel agent mode + from parallel_agents import ParallelAgentOrchestrator + + print(f"\n{'=' * 70}") + print(f" PARALLEL AGENT MODE - {num_agents} AGENTS") + print("=" * 70) + print(f"\nProject directory: {project_dir}") + print(f"Model: {args.model}") + print(f"Number of agents: {num_agents}") + if args.yolo: + print("Mode: YOLO (testing disabled)") + print() + + root_dir = Path(__file__).parent + orchestrator = ParallelAgentOrchestrator( + project_dir=project_dir, + root_dir=root_dir, + max_agents=num_agents, ) - except KeyboardInterrupt: - print("\n\nInterrupted by user") - print("To resume, run the same command again") - except Exception as e: - print(f"\nFatal error: {e}") - raise + + async def run_parallel(): + try: + results = await orchestrator.start_agents( + num_agents=num_agents, + yolo_mode=args.yolo, + model=args.model, + ) + print(f"\nStarted agents: {results}") + + # Wait for all agents to complete (or user interrupt) + while True: + health = await orchestrator.healthcheck() + running = sum(1 for v in health.values() if v) + if running == 0: + print("\nAll agents have finished.") + break + await asyncio.sleep(5) + + except KeyboardInterrupt: + print("\n\nInterrupted - stopping all agents...") + await orchestrator.stop_all_agents() + finally: + # Optional: merge worktree changes + print("\nCleaning up worktrees...") + await orchestrator.cleanup() + + try: + asyncio.run(run_parallel()) + except KeyboardInterrupt: + print("\n\nInterrupted by user") + except Exception as e: + print(f"\nFatal error: {e}") + raise + else: + # Single agent mode (original behavior) + try: + # Run the agent (MCP server handles feature database) + asyncio.run( + run_autonomous_agent( + project_dir=project_dir, + model=args.model, + max_iterations=args.max_iterations, + yolo_mode=args.yolo, + ) + ) + except KeyboardInterrupt: + print("\n\nInterrupted by user") + print("To resume, run the same command again") + except Exception as e: + print(f"\nFatal error: {e}") + raise if __name__ == "__main__": diff --git a/client.py b/client.py index 55747913..eefe27e1 100644 --- a/client.py +++ b/client.py @@ -26,6 +26,9 @@ "mcp__features__feature_mark_passing", "mcp__features__feature_skip", "mcp__features__feature_create_bulk", + "mcp__features__feature_claim_next", # Atomic claim for parallel agents + "mcp__features__feature_release", # Release feature back to queue + "mcp__features__feature_clear_in_progress", ] # Playwright MCP tools for browser automation @@ -74,7 +77,12 @@ ] -def create_client(project_dir: Path, model: str, yolo_mode: bool = False): +def create_client( + project_dir: Path, + model: str, + yolo_mode: bool = False, + agent_id: str | None = None, +): """ Create a Claude Agent SDK client with multi-layered security. @@ -82,6 +90,7 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): project_dir: Directory for the project model: Claude model to use yolo_mode: If True, skip Playwright MCP server for rapid prototyping + agent_id: Optional agent identifier for parallel execution Returns: Configured ClaudeSDKClient (from claude_agent_sdk) @@ -150,6 +159,8 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): else: print(" - MCP servers: playwright (browser), features (database)") print(" - Project settings enabled (skills, commands, CLAUDE.md)") + if agent_id: + print(f" - Parallel mode: Agent ID = {agent_id}") print() # Use system Claude CLI instead of bundled one (avoids Bun runtime crash on Windows) @@ -160,17 +171,23 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): print(" - Warning: System Claude CLI not found, using bundled CLI") # Build MCP servers config - features is always included, playwright only in standard mode + mcp_env = { + # Inherit parent environment (PATH, ANTHROPIC_API_KEY, etc.) + **os.environ, + # Add custom variables + "PROJECT_DIR": str(project_dir.resolve()), + "PYTHONPATH": str(Path(__file__).parent.resolve()), + } + + # Add agent_id for parallel execution + if agent_id: + mcp_env["AGENT_ID"] = agent_id + mcp_servers = { "features": { "command": sys.executable, # Use the same Python that's running this script "args": ["-m", "mcp_server.feature_mcp"], - "env": { - # Inherit parent environment (PATH, ANTHROPIC_API_KEY, etc.) - **os.environ, - # Add custom variables - "PROJECT_DIR": str(project_dir.resolve()), - "PYTHONPATH": str(Path(__file__).parent.resolve()), - }, + "env": mcp_env, }, } if not yolo_mode: diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 8c5f3c83..6d62a900 100644 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -142,27 +142,38 @@ def feature_get_stats() -> str: @mcp.tool() -def feature_get_next() -> str: +def feature_get_next( + agent_id: Annotated[str, Field(default="", description="Optional agent ID to filter out features being worked on by other agents")] = "" +) -> str: """Get the highest-priority pending feature to work on. - Returns the feature with the lowest priority number that has passes=false. + Returns the feature with the lowest priority number that has passes=false + and is not currently being worked on by another agent. Use this at the start of each coding session to determine what to implement next. + Args: + agent_id: Optional agent ID. If provided, excludes features assigned to other agents. + Returns: - JSON with feature details (id, priority, category, name, description, steps, passes, in_progress) - or error message if all features are passing. + JSON with feature details (id, priority, category, name, description, steps, passes, in_progress, assigned_agent_id) + or error message if all features are passing or assigned. """ session = get_session() try: - feature = ( - session.query(Feature) - .filter(Feature.passes == False) - .order_by(Feature.priority.asc(), Feature.id.asc()) - .first() - ) + query = session.query(Feature).filter(Feature.passes == False) + + # If agent_id provided, exclude features assigned to other agents + if agent_id: + query = query.filter( + (Feature.assigned_agent_id == None) | + (Feature.assigned_agent_id == "") | + (Feature.assigned_agent_id == agent_id) + ) + + feature = query.order_by(Feature.priority.asc(), Feature.id.asc()).first() if feature is None: - return json.dumps({"error": "All features are passing! No more work to do."}) + return json.dumps({"error": "All features are passing or assigned to other agents! No more work to do."}) return json.dumps(feature.to_dict(), indent=2) finally: @@ -209,8 +220,9 @@ def feature_mark_passing( ) -> str: """Mark a feature as passing after successful implementation. - Updates the feature's passes field to true and clears the in_progress flag. - Use this after you have implemented the feature and verified it works correctly. + Updates the feature's passes field to true and clears the in_progress flag + and agent assignment. Use this after you have implemented the feature and + verified it works correctly. Args: feature_id: The ID of the feature to mark as passing @@ -227,6 +239,7 @@ def feature_mark_passing( feature.passes = True feature.in_progress = False + feature.assigned_agent_id = None # Clear agent assignment on completion session.commit() session.refresh(feature) @@ -290,7 +303,8 @@ def feature_skip( @mcp.tool() def feature_mark_in_progress( - feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)] + feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)], + agent_id: Annotated[str, Field(default="", description="Optional agent ID to assign this feature to")] = "" ) -> str: """Mark a feature as in-progress. Call immediately after feature_get_next(). @@ -299,9 +313,10 @@ def feature_mark_in_progress( Args: feature_id: The ID of the feature to mark as in-progress + agent_id: Optional agent ID to assign this feature to Returns: - JSON with the updated feature details, or error if not found or already in-progress. + JSON with the updated feature details, or error if not found or already in-progress by another agent. """ session = get_session() try: @@ -313,10 +328,16 @@ def feature_mark_in_progress( if feature.passes: return json.dumps({"error": f"Feature with ID {feature_id} is already passing"}) - if feature.in_progress: - return json.dumps({"error": f"Feature with ID {feature_id} is already in-progress"}) + # Check if already in progress by another agent + if feature.in_progress and feature.assigned_agent_id and agent_id: + if feature.assigned_agent_id != agent_id: + return json.dumps({ + "error": f"Feature with ID {feature_id} is already in-progress by agent {feature.assigned_agent_id}" + }) feature.in_progress = True + if agent_id: + feature.assigned_agent_id = agent_id session.commit() session.refresh(feature) @@ -348,6 +369,7 @@ def feature_clear_in_progress( return json.dumps({"error": f"Feature with ID {feature_id} not found"}) feature.in_progress = False + feature.assigned_agent_id = None session.commit() session.refresh(feature) @@ -356,6 +378,100 @@ def feature_clear_in_progress( session.close() +@mcp.tool() +def feature_claim_next( + agent_id: Annotated[str, Field(description="The agent ID claiming the feature")] +) -> str: + """Atomically get and claim the next available feature for an agent. + + This is the preferred method for parallel agents to avoid race conditions. + It combines feature_get_next and feature_mark_in_progress into a single + atomic operation. + + Args: + agent_id: The agent ID claiming the feature + + Returns: + JSON with the claimed feature details, or error if no features available. + """ + session = get_session() + try: + # Find the next available feature not assigned to another agent + feature = ( + session.query(Feature) + .filter(Feature.passes == False) + .filter( + (Feature.in_progress == False) | + (Feature.assigned_agent_id == None) | + (Feature.assigned_agent_id == "") | + (Feature.assigned_agent_id == agent_id) + ) + .order_by(Feature.priority.asc(), Feature.id.asc()) + .with_for_update() # Lock the row + .first() + ) + + if feature is None: + return json.dumps({"error": "No features available to claim. All are passing or assigned."}) + + # Claim the feature + feature.in_progress = True + feature.assigned_agent_id = agent_id + session.commit() + session.refresh(feature) + + return json.dumps(feature.to_dict(), indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) + finally: + session.close() + + +@mcp.tool() +def feature_release( + feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], + agent_id: Annotated[str, Field(default="", description="The agent ID releasing the feature")] = "" +) -> str: + """Release a feature back to the queue without marking it as passing. + + Use this when an agent needs to stop working on a feature but hasn't + completed it. The feature will be available for other agents to claim. + + Args: + feature_id: The ID of the feature to release + agent_id: Optional agent ID for verification + + Returns: + JSON with the updated feature details, or error if not found. + """ + session = get_session() + try: + feature = session.query(Feature).filter(Feature.id == feature_id).first() + + if feature is None: + return json.dumps({"error": f"Feature with ID {feature_id} not found"}) + + # Only release if the agent owns it or no agent specified + if agent_id and feature.assigned_agent_id and feature.assigned_agent_id != agent_id: + return json.dumps({ + "error": f"Feature is assigned to agent {feature.assigned_agent_id}, not {agent_id}" + }) + + feature.in_progress = False + feature.assigned_agent_id = None + session.commit() + session.refresh(feature) + + return json.dumps({ + "released": True, + "feature": feature.to_dict(), + "message": f"Feature '{feature.name}' released back to queue" + }, indent=2) + finally: + session.close() + + @mcp.tool() def feature_create_bulk( features: Annotated[list[dict], Field(description="List of features to create, each with category, name, description, and steps")] diff --git a/parallel_agent_runner.py b/parallel_agent_runner.py new file mode 100644 index 00000000..348b1b85 --- /dev/null +++ b/parallel_agent_runner.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +Parallel Agent Runner +===================== + +Runs a single agent instance as part of a parallel agent pool. +Each agent runs in its own worktree and identifies itself with an agent_id. + +This script is spawned by the ParallelAgentOrchestrator for each agent. +""" + +import argparse +import asyncio +import os +from pathlib import Path + +from dotenv import load_dotenv + +load_dotenv() + +from agent import run_autonomous_agent +from registry import get_project_path + + +DEFAULT_MODEL = "claude-sonnet-4-5-20250929" + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Parallel Agent Runner - Single agent in parallel pool", + ) + + parser.add_argument( + "--project-dir", + type=str, + required=True, + help="Main project directory (for database access)", + ) + + parser.add_argument( + "--worktree-dir", + type=str, + required=True, + help="Git worktree directory (agent's working directory)", + ) + + parser.add_argument( + "--agent-id", + type=str, + required=True, + help="Unique identifier for this agent", + ) + + parser.add_argument( + "--model", + type=str, + default=DEFAULT_MODEL, + help=f"Claude model to use (default: {DEFAULT_MODEL})", + ) + + parser.add_argument( + "--max-iterations", + type=int, + default=None, + help="Maximum iterations (default: unlimited)", + ) + + parser.add_argument( + "--yolo", + action="store_true", + default=False, + help="Enable YOLO mode (no browser testing)", + ) + + return parser.parse_args() + + +def main() -> None: + """Main entry point.""" + args = parse_args() + + project_dir = Path(args.project_dir).resolve() + worktree_dir = Path(args.worktree_dir).resolve() + + if not project_dir.exists(): + print(f"Error: Project directory does not exist: {project_dir}") + return + + if not worktree_dir.exists(): + print(f"Error: Worktree directory does not exist: {worktree_dir}") + return + + print(f"[{args.agent_id}] Starting agent") + print(f"[{args.agent_id}] Project dir: {project_dir}") + print(f"[{args.agent_id}] Worktree dir: {worktree_dir}") + + # Set environment variable for MCP server to use main project dir for database + os.environ["PROJECT_DIR"] = str(project_dir) + os.environ["AGENT_ID"] = args.agent_id + + try: + asyncio.run( + run_autonomous_agent( + project_dir=worktree_dir, # Agent works in worktree + model=args.model, + max_iterations=args.max_iterations, + yolo_mode=args.yolo, + agent_id=args.agent_id, # Pass agent_id to agent + ) + ) + except KeyboardInterrupt: + print(f"\n\n[{args.agent_id}] Interrupted by user") + except Exception as e: + print(f"\n[{args.agent_id}] Fatal error: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/parallel_agents.py b/parallel_agents.py new file mode 100644 index 00000000..780f1f7a --- /dev/null +++ b/parallel_agents.py @@ -0,0 +1,383 @@ +""" +Parallel Agent Orchestrator +=========================== + +Manages multiple Claude agents working in parallel on the same project. +Each agent gets its own git worktree for isolation while sharing +the same feature database. +""" + +import asyncio +import logging +import subprocess +import sys +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Callable, Awaitable, Literal, Optional + +import psutil + +from worktree import WorktreeManager + +logger = logging.getLogger(__name__) + + +@dataclass +class AgentInfo: + """Information about a running agent.""" + agent_id: str + process: Optional[subprocess.Popen] = None + worktree_path: Optional[Path] = None + status: Literal["stopped", "running", "paused", "crashed"] = "stopped" + started_at: Optional[datetime] = None + output_task: Optional[asyncio.Task] = None + + +class ParallelAgentOrchestrator: + """ + Orchestrates multiple parallel agents for a project. + + Each agent: + - Gets its own git worktree for code isolation + - Shares the same features.db for task coordination + - Has an independent Claude session + """ + + def __init__( + self, + project_dir: Path, + root_dir: Path, + max_agents: int = 3, + ): + """ + Initialize the orchestrator. + + Args: + project_dir: The main project directory + root_dir: Root directory of autocoder (for scripts) + max_agents: Maximum number of parallel agents + """ + self.project_dir = project_dir.resolve() + self.root_dir = root_dir + self.max_agents = max_agents + self.worktree_manager = WorktreeManager(project_dir) + + # Track running agents + self.agents: dict[str, AgentInfo] = {} + + # Callbacks for output streaming + self._output_callbacks: list[Callable[[str, str], Awaitable[None]]] = [] + self._status_callbacks: list[Callable[[str, str], Awaitable[None]]] = [] + + def add_output_callback(self, callback: Callable[[str, str], Awaitable[None]]) -> None: + """Add callback for agent output (agent_id, line).""" + self._output_callbacks.append(callback) + + def add_status_callback(self, callback: Callable[[str, str], Awaitable[None]]) -> None: + """Add callback for status changes (agent_id, status).""" + self._status_callbacks.append(callback) + + async def _notify_output(self, agent_id: str, line: str) -> None: + """Notify callbacks of agent output.""" + for callback in self._output_callbacks: + try: + await callback(agent_id, line) + except Exception as e: + logger.warning(f"Output callback error: {e}") + + async def _notify_status(self, agent_id: str, status: str) -> None: + """Notify callbacks of status change.""" + for callback in self._status_callbacks: + try: + await callback(agent_id, status) + except Exception as e: + logger.warning(f"Status callback error: {e}") + + def generate_agent_id(self, index: int) -> str: + """Generate a unique agent ID.""" + return f"agent-{index + 1}" + + async def start_agents( + self, + num_agents: int, + yolo_mode: bool = False, + model: str = "claude-sonnet-4-5-20250929", + ) -> dict[str, bool]: + """ + Start multiple agents in parallel. + + Args: + num_agents: Number of agents to start (capped at max_agents) + yolo_mode: Enable YOLO mode (no browser testing) + model: Claude model to use + + Returns: + Dict mapping agent_id to success status + """ + num_agents = min(num_agents, self.max_agents) + results = {} + + # Ensure git repo is initialized + if not self.worktree_manager.ensure_git_repo(): + logger.error("Failed to initialize git repository") + return {self.generate_agent_id(i): False for i in range(num_agents)} + + # Start agents + for i in range(num_agents): + agent_id = self.generate_agent_id(i) + success = await self.start_agent(agent_id, yolo_mode, model) + results[agent_id] = success + + return results + + async def start_agent( + self, + agent_id: str, + yolo_mode: bool = False, + model: str = "claude-sonnet-4-5-20250929", + ) -> bool: + """ + Start a single agent. + + Args: + agent_id: Unique agent identifier + yolo_mode: Enable YOLO mode + model: Claude model to use + + Returns: + True if started successfully + """ + if agent_id in self.agents and self.agents[agent_id].status == "running": + logger.warning(f"Agent {agent_id} is already running") + return False + + # Create worktree for this agent + worktree_path = self.worktree_manager.create_worktree(agent_id) + if worktree_path is None: + logger.error(f"Failed to create worktree for agent {agent_id}") + return False + + # Build command - use the parallel agent script + cmd = [ + sys.executable, + str(self.root_dir / "parallel_agent_runner.py"), + "--project-dir", str(self.project_dir), + "--worktree-dir", str(worktree_path), + "--agent-id", agent_id, + "--model", model, + ] + + if yolo_mode: + cmd.append("--yolo") + + try: + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=str(worktree_path), + ) + + agent_info = AgentInfo( + agent_id=agent_id, + process=process, + worktree_path=worktree_path, + status="running", + started_at=datetime.now(), + ) + self.agents[agent_id] = agent_info + + # Start output streaming + agent_info.output_task = asyncio.create_task( + self._stream_output(agent_id) + ) + + await self._notify_status(agent_id, "running") + logger.info(f"Started agent {agent_id} (PID {process.pid})") + return True + + except Exception as e: + logger.exception(f"Failed to start agent {agent_id}") + return False + + async def _stream_output(self, agent_id: str) -> None: + """Stream output from an agent process.""" + agent = self.agents.get(agent_id) + if not agent or not agent.process or not agent.process.stdout: + return + + try: + loop = asyncio.get_running_loop() + while True: + line = await loop.run_in_executor( + None, agent.process.stdout.readline + ) + if not line: + break + + decoded = line.decode("utf-8", errors="replace").rstrip() + await self._notify_output(agent_id, decoded) + + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"Output streaming error for {agent_id}: {e}") + finally: + # Check if process ended + if agent.process and agent.process.poll() is not None: + exit_code = agent.process.returncode + if exit_code != 0 and agent.status == "running": + agent.status = "crashed" + elif agent.status == "running": + agent.status = "stopped" + await self._notify_status(agent_id, agent.status) + + async def stop_agent(self, agent_id: str) -> bool: + """Stop a single agent.""" + agent = self.agents.get(agent_id) + if not agent or not agent.process: + return False + + try: + # Cancel output streaming + if agent.output_task: + agent.output_task.cancel() + try: + await agent.output_task + except asyncio.CancelledError: + pass + + # Terminate process + agent.process.terminate() + + loop = asyncio.get_running_loop() + try: + await asyncio.wait_for( + loop.run_in_executor(None, agent.process.wait), + timeout=5.0 + ) + except asyncio.TimeoutError: + agent.process.kill() + await loop.run_in_executor(None, agent.process.wait) + + agent.status = "stopped" + agent.process = None + await self._notify_status(agent_id, "stopped") + + logger.info(f"Stopped agent {agent_id}") + return True + + except Exception as e: + logger.exception(f"Failed to stop agent {agent_id}") + return False + + async def stop_all_agents(self) -> None: + """Stop all running agents.""" + tasks = [] + for agent_id in list(self.agents.keys()): + if self.agents[agent_id].status == "running": + tasks.append(self.stop_agent(agent_id)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + async def pause_agent(self, agent_id: str) -> bool: + """Pause an agent using psutil.""" + agent = self.agents.get(agent_id) + if not agent or not agent.process or agent.status != "running": + return False + + try: + proc = psutil.Process(agent.process.pid) + proc.suspend() + agent.status = "paused" + await self._notify_status(agent_id, "paused") + return True + except Exception as e: + logger.exception(f"Failed to pause agent {agent_id}") + return False + + async def resume_agent(self, agent_id: str) -> bool: + """Resume a paused agent.""" + agent = self.agents.get(agent_id) + if not agent or not agent.process or agent.status != "paused": + return False + + try: + proc = psutil.Process(agent.process.pid) + proc.resume() + agent.status = "running" + await self._notify_status(agent_id, "running") + return True + except Exception as e: + logger.exception(f"Failed to resume agent {agent_id}") + return False + + def get_agent_status(self, agent_id: str) -> dict: + """Get status of a single agent.""" + agent = self.agents.get(agent_id) + if not agent: + return {"agent_id": agent_id, "status": "unknown"} + + return { + "agent_id": agent_id, + "status": agent.status, + "pid": agent.process.pid if agent.process else None, + "started_at": agent.started_at.isoformat() if agent.started_at else None, + "worktree_path": str(agent.worktree_path) if agent.worktree_path else None, + } + + def get_all_statuses(self) -> list[dict]: + """Get status of all agents.""" + return [self.get_agent_status(aid) for aid in self.agents] + + async def healthcheck(self) -> dict[str, bool]: + """Check health of all agents.""" + results = {} + for agent_id, agent in self.agents.items(): + if not agent.process: + results[agent_id] = agent.status == "stopped" + continue + + poll = agent.process.poll() + if poll is not None: + if agent.status in ("running", "paused"): + agent.status = "crashed" + await self._notify_status(agent_id, "crashed") + results[agent_id] = False + else: + results[agent_id] = True + + return results + + async def merge_all_worktrees(self) -> dict[str, bool]: + """Merge changes from all agent worktrees back to main.""" + results = {} + for agent_id in self.agents: + success = self.worktree_manager.merge_worktree_changes(agent_id) + results[agent_id] = success + return results + + async def cleanup(self) -> None: + """Stop all agents and clean up worktrees.""" + await self.stop_all_agents() + self.worktree_manager.cleanup_all_worktrees() + + +# Global orchestrator registry +_orchestrators: dict[str, ParallelAgentOrchestrator] = {} + + +def get_orchestrator( + project_name: str, + project_dir: Path, + root_dir: Path, + max_agents: int = 3, +) -> ParallelAgentOrchestrator: + """Get or create an orchestrator for a project.""" + if project_name not in _orchestrators: + _orchestrators[project_name] = ParallelAgentOrchestrator( + project_dir, root_dir, max_agents + ) + return _orchestrators[project_name] diff --git a/server/main.py b/server/main.py index 370a4901..113d07ac 100644 --- a/server/main.py +++ b/server/main.py @@ -15,7 +15,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse -from .routers import projects_router, features_router, agent_router, spec_creation_router, filesystem_router, assistant_chat_router +from .routers import projects_router, features_router, agent_router, spec_creation_router, filesystem_router, assistant_chat_router, parallel_agents_router from .websocket import project_websocket from .services.process_manager import cleanup_all_managers from .services.assistant_chat_session import cleanup_all_sessions as cleanup_assistant_sessions @@ -83,6 +83,7 @@ async def require_localhost(request: Request, call_next): app.include_router(projects_router) app.include_router(features_router) app.include_router(agent_router) +app.include_router(parallel_agents_router) # Parallel agent management app.include_router(spec_creation_router) app.include_router(filesystem_router) app.include_router(assistant_chat_router) diff --git a/server/routers/__init__.py b/server/routers/__init__.py index 0cdd8293..16aaba76 100644 --- a/server/routers/__init__.py +++ b/server/routers/__init__.py @@ -8,6 +8,7 @@ from .projects import router as projects_router from .features import router as features_router from .agent import router as agent_router +from .parallel_agents import router as parallel_agents_router from .spec_creation import router as spec_creation_router from .filesystem import router as filesystem_router from .assistant_chat import router as assistant_chat_router @@ -16,6 +17,7 @@ "projects_router", "features_router", "agent_router", + "parallel_agents_router", "spec_creation_router", "filesystem_router", "assistant_chat_router", diff --git a/server/routers/parallel_agents.py b/server/routers/parallel_agents.py new file mode 100644 index 00000000..adbf3901 --- /dev/null +++ b/server/routers/parallel_agents.py @@ -0,0 +1,224 @@ +""" +Parallel Agents Router +====================== + +API endpoints for parallel agent control (start/stop multiple agents). +Uses git worktrees for agent isolation. +""" + +import re +from pathlib import Path + +from fastapi import APIRouter, HTTPException + +from ..schemas import ( + ParallelAgentStartRequest, + ParallelAgentsStatus, + ParallelAgentInfo, + ParallelAgentActionResponse, +) + + +def _get_project_path(project_name: str) -> Path: + """Get project path from registry.""" + import sys + root = Path(__file__).parent.parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + + from registry import get_project_path + return get_project_path(project_name) + + +router = APIRouter(prefix="/api/projects/{project_name}/parallel-agents", tags=["parallel-agents"]) + +# Root directory for autocoder +ROOT_DIR = Path(__file__).parent.parent.parent + + +def validate_project_name(name: str) -> str: + """Validate and sanitize project name to prevent path traversal.""" + if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name): + raise HTTPException( + status_code=400, + detail="Invalid project name" + ) + return name + + +def get_orchestrator(project_name: str): + """Get or create the parallel agent orchestrator for a project.""" + from parallel_agents import get_orchestrator as _get_orchestrator + + project_name = validate_project_name(project_name) + project_dir = _get_project_path(project_name) + + if not project_dir: + raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") + + if not project_dir.exists(): + raise HTTPException(status_code=404, detail=f"Project directory not found: {project_dir}") + + return _get_orchestrator(project_name, project_dir, ROOT_DIR) + + +@router.get("/status", response_model=ParallelAgentsStatus) +async def get_parallel_agents_status(project_name: str): + """Get the status of all parallel agents for a project.""" + orchestrator = get_orchestrator(project_name) + + # Run healthcheck + await orchestrator.healthcheck() + + agents = [] + running_count = 0 + + for agent_id, agent in orchestrator.agents.items(): + agents.append(ParallelAgentInfo( + agent_id=agent_id, + status=agent.status, + pid=agent.process.pid if agent.process else None, + started_at=agent.started_at, + worktree_path=str(agent.worktree_path) if agent.worktree_path else None, + )) + if agent.status == "running": + running_count += 1 + + return ParallelAgentsStatus( + agents=agents, + total_running=running_count, + max_agents=orchestrator.max_agents, + ) + + +@router.post("/start", response_model=ParallelAgentActionResponse) +async def start_parallel_agents( + project_name: str, + request: ParallelAgentStartRequest = ParallelAgentStartRequest(), +): + """Start multiple parallel agents for a project.""" + orchestrator = get_orchestrator(project_name) + + results = await orchestrator.start_agents( + num_agents=request.num_agents, + yolo_mode=request.yolo_mode, + ) + + all_success = all(results.values()) + success_count = sum(1 for v in results.values() if v) + + return ParallelAgentActionResponse( + success=all_success, + agents=results, + message=f"Started {success_count}/{request.num_agents} agents", + ) + + +@router.post("/stop", response_model=ParallelAgentActionResponse) +async def stop_all_parallel_agents(project_name: str): + """Stop all parallel agents for a project.""" + orchestrator = get_orchestrator(project_name) + + # Get list of running agents before stopping + running_agents = [ + aid for aid, agent in orchestrator.agents.items() + if agent.status == "running" + ] + + await orchestrator.stop_all_agents() + + # Build results dict + results = {aid: True for aid in running_agents} + + return ParallelAgentActionResponse( + success=True, + agents=results, + message=f"Stopped {len(running_agents)} agents", + ) + + +@router.post("/{agent_id}/start") +async def start_single_agent( + project_name: str, + agent_id: str, + yolo_mode: bool = False, +): + """Start a single parallel agent.""" + orchestrator = get_orchestrator(project_name) + + success = await orchestrator.start_agent(agent_id, yolo_mode=yolo_mode) + + return { + "success": success, + "agent_id": agent_id, + "status": orchestrator.agents.get(agent_id, {}).status if agent_id in orchestrator.agents else "unknown", + } + + +@router.post("/{agent_id}/stop") +async def stop_single_agent(project_name: str, agent_id: str): + """Stop a single parallel agent.""" + orchestrator = get_orchestrator(project_name) + + success = await orchestrator.stop_agent(agent_id) + + return { + "success": success, + "agent_id": agent_id, + "status": "stopped" if success else "unknown", + } + + +@router.post("/{agent_id}/pause") +async def pause_single_agent(project_name: str, agent_id: str): + """Pause a single parallel agent.""" + orchestrator = get_orchestrator(project_name) + + success = await orchestrator.pause_agent(agent_id) + + return { + "success": success, + "agent_id": agent_id, + "status": "paused" if success else "unknown", + } + + +@router.post("/{agent_id}/resume") +async def resume_single_agent(project_name: str, agent_id: str): + """Resume a single parallel agent.""" + orchestrator = get_orchestrator(project_name) + + success = await orchestrator.resume_agent(agent_id) + + return { + "success": success, + "agent_id": agent_id, + "status": "running" if success else "unknown", + } + + +@router.post("/merge") +async def merge_all_worktrees(project_name: str): + """Merge changes from all agent worktrees back to main branch.""" + orchestrator = get_orchestrator(project_name) + + results = await orchestrator.merge_all_worktrees() + + return { + "success": all(results.values()), + "agents": results, + "message": f"Merged {sum(1 for v in results.values() if v)}/{len(results)} worktrees", + } + + +@router.post("/cleanup") +async def cleanup_parallel_agents(project_name: str): + """Stop all agents and clean up worktrees.""" + orchestrator = get_orchestrator(project_name) + + await orchestrator.cleanup() + + return { + "success": True, + "message": "All agents stopped and worktrees cleaned up", + } diff --git a/server/schemas.py b/server/schemas.py index 723d4609..95b0cb9c 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -84,6 +84,7 @@ class FeatureResponse(FeatureBase): priority: int passes: bool in_progress: bool + assigned_agent_id: str | None = None # Agent working on this feature class Config: from_attributes = True @@ -120,6 +121,39 @@ class AgentActionResponse(BaseModel): message: str = "" +# ============================================================================ +# Parallel Agent Schemas +# ============================================================================ + +class ParallelAgentStartRequest(BaseModel): + """Request schema for starting parallel agents.""" + num_agents: int = Field(default=2, ge=1, le=10, description="Number of agents to start") + yolo_mode: bool = False + + +class ParallelAgentInfo(BaseModel): + """Information about a single parallel agent.""" + agent_id: str + status: Literal["stopped", "running", "paused", "crashed", "unknown"] + pid: int | None = None + started_at: datetime | None = None + worktree_path: str | None = None + + +class ParallelAgentsStatus(BaseModel): + """Status of all parallel agents.""" + agents: list[ParallelAgentInfo] + total_running: int + max_agents: int + + +class ParallelAgentActionResponse(BaseModel): + """Response for parallel agent control actions.""" + success: bool + agents: dict[str, bool] # agent_id -> success + message: str = "" + + # ============================================================================ # Setup Schemas # ============================================================================ diff --git a/ui/src/components/FeatureCard.tsx b/ui/src/components/FeatureCard.tsx index c7190fb3..734d04c2 100644 --- a/ui/src/components/FeatureCard.tsx +++ b/ui/src/components/FeatureCard.tsx @@ -1,4 +1,4 @@ -import { CheckCircle2, Circle, Loader2 } from 'lucide-react' +import { CheckCircle2, Circle, Loader2, Bot } from 'lucide-react' import type { Feature } from '../lib/types' interface FeatureCardProps { @@ -27,6 +27,26 @@ function getCategoryColor(category: string): string { return colors[Math.abs(hash) % colors.length] } +// Generate consistent color for agent ID +function getAgentColor(agentId: string): string { + const colors = [ + '#00b4d8', // cyan - agent-1 + '#70e000', // green - agent-2 + '#8338ec', // purple - agent-3 + '#ff5400', // orange - agent-4 + '#ff006e', // pink - agent-5 + ] + + // Extract number from agent-N format + const match = agentId.match(/\d+/) + if (match) { + const num = parseInt(match[0], 10) - 1 + return colors[num % colors.length] + } + + return colors[0] +} + export function FeatureCard({ feature, onClick, isInProgress }: FeatureCardProps) { const categoryColor = getCategoryColor(feature.category) @@ -63,22 +83,39 @@ export function FeatureCard({ feature, onClick, isInProgress }: FeatureCardProps

{/* Status */} -
- {isInProgress ? ( - <> - - Processing... - - ) : feature.passes ? ( - <> - - Complete - - ) : ( - <> - - Pending - +
+
+ {isInProgress ? ( + <> + + Processing... + + ) : feature.passes ? ( + <> + + Complete + + ) : ( + <> + + Pending + + )} +
+ + {/* Agent badge */} + {feature.assigned_agent_id && ( +
+ + {feature.assigned_agent_id.replace('agent-', '#')} +
)}
diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index c5de1958..5e81179a 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -66,6 +66,7 @@ export interface Feature { steps: string[] passes: boolean in_progress: boolean + assigned_agent_id: string | null // Agent working on this feature } export interface FeatureListResponse { @@ -98,6 +99,27 @@ export interface AgentActionResponse { message: string } +// Parallel agent types +export interface ParallelAgentInfo { + agent_id: string + status: 'stopped' | 'running' | 'paused' | 'crashed' | 'unknown' + pid: number | null + started_at: string | null + worktree_path: string | null +} + +export interface ParallelAgentsStatus { + agents: ParallelAgentInfo[] + total_running: number + max_agents: number +} + +export interface ParallelAgentActionResponse { + success: boolean + agents: Record + message: string +} + // Setup types export interface SetupStatus { claude_cli: boolean diff --git a/ui/tsconfig.tsbuildinfo b/ui/tsconfig.tsbuildinfo index 8bf9c84a..973c5f0f 100644 --- a/ui/tsconfig.tsbuildinfo +++ b/ui/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/app.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/addfeatureform.tsx","./src/components/agentcontrol.tsx","./src/components/agentthought.tsx","./src/components/assistantchat.tsx","./src/components/assistantfab.tsx","./src/components/assistantpanel.tsx","./src/components/chatmessage.tsx","./src/components/debuglogviewer.tsx","./src/components/featurecard.tsx","./src/components/featuremodal.tsx","./src/components/folderbrowser.tsx","./src/components/kanbanboard.tsx","./src/components/kanbancolumn.tsx","./src/components/newprojectmodal.tsx","./src/components/progressdashboard.tsx","./src/components/projectselector.tsx","./src/components/questionoptions.tsx","./src/components/setupwizard.tsx","./src/components/speccreationchat.tsx","./src/components/typingindicator.tsx","./src/hooks/useassistantchat.ts","./src/hooks/usecelebration.ts","./src/hooks/usefeaturesound.ts","./src/hooks/useprojects.ts","./src/hooks/usespecchat.ts","./src/hooks/usewebsocket.ts","./src/lib/api.ts","./src/lib/types.ts"],"version":"5.6.3"} \ No newline at end of file +{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/AddFeatureForm.tsx","./src/components/AgentControl.tsx","./src/components/AgentThought.tsx","./src/components/AssistantChat.tsx","./src/components/AssistantFAB.tsx","./src/components/AssistantPanel.tsx","./src/components/ChatMessage.tsx","./src/components/DebugLogViewer.tsx","./src/components/FeatureCard.tsx","./src/components/FeatureModal.tsx","./src/components/FolderBrowser.tsx","./src/components/KanbanBoard.tsx","./src/components/KanbanColumn.tsx","./src/components/NewProjectModal.tsx","./src/components/ProgressDashboard.tsx","./src/components/ProjectSelector.tsx","./src/components/QuestionOptions.tsx","./src/components/SetupWizard.tsx","./src/components/SpecCreationChat.tsx","./src/components/TypingIndicator.tsx","./src/hooks/useAssistantChat.ts","./src/hooks/useCelebration.ts","./src/hooks/useFeatureSound.ts","./src/hooks/useProjects.ts","./src/hooks/useSpecChat.ts","./src/hooks/useWebSocket.ts","./src/lib/api.ts","./src/lib/types.ts"],"version":"5.6.3"} \ No newline at end of file diff --git a/worktree.py b/worktree.py new file mode 100644 index 00000000..490850e7 --- /dev/null +++ b/worktree.py @@ -0,0 +1,306 @@ +""" +Git Worktree Management +======================= + +Manages git worktrees for parallel agent execution. +Each agent gets its own isolated working directory while sharing +the same git repository and feature database. +""" + +import logging +import subprocess +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +class WorktreeManager: + """ + Manages git worktrees for parallel agents. + + Each worktree provides an isolated working directory where an agent + can make changes without conflicting with other agents. + """ + + def __init__(self, project_dir: Path): + """ + Initialize the worktree manager. + + Args: + project_dir: The main project directory (must be a git repo) + """ + self.project_dir = project_dir.resolve() + self.worktrees_dir = self.project_dir.parent / f".{self.project_dir.name}_worktrees" + + def _run_git(self, *args: str, cwd: Optional[Path] = None) -> subprocess.CompletedProcess: + """Run a git command and return the result.""" + cmd = ["git"] + list(args) + return subprocess.run( + cmd, + cwd=cwd or self.project_dir, + capture_output=True, + text=True, + ) + + def is_git_repo(self) -> bool: + """Check if the project directory is a git repository.""" + result = self._run_git("rev-parse", "--git-dir") + return result.returncode == 0 + + def ensure_git_repo(self) -> bool: + """ + Ensure the project directory is a git repository. + Initializes one if it doesn't exist. + + Returns: + True if repo exists or was created, False on error + """ + if self.is_git_repo(): + return True + + logger.info(f"Initializing git repository in {self.project_dir}") + result = self._run_git("init") + if result.returncode != 0: + logger.error(f"Failed to initialize git repo: {result.stderr}") + return False + + # Create initial commit so worktrees can branch from it + result = self._run_git("add", "-A") + if result.returncode != 0: + logger.warning(f"Git add failed: {result.stderr}") + + result = self._run_git("commit", "-m", "Initial commit for parallel agents", "--allow-empty") + if result.returncode != 0: + logger.warning(f"Git commit failed: {result.stderr}") + + return True + + def get_worktree_path(self, agent_id: str) -> Path: + """Get the path for an agent's worktree.""" + return self.worktrees_dir / f"agent_{agent_id}" + + def create_worktree(self, agent_id: str, branch_name: Optional[str] = None) -> Optional[Path]: + """ + Create a worktree for an agent. + + Args: + agent_id: Unique identifier for the agent + branch_name: Optional branch name (defaults to agent-{agent_id}) + + Returns: + Path to the worktree, or None on failure + """ + if not self.ensure_git_repo(): + return None + + worktree_path = self.get_worktree_path(agent_id) + branch = branch_name or f"agent-{agent_id}" + + # Check if worktree already exists + if worktree_path.exists(): + logger.info(f"Worktree already exists for agent {agent_id}") + return worktree_path + + # Ensure worktrees directory exists + self.worktrees_dir.mkdir(parents=True, exist_ok=True) + + # Get current branch/HEAD for base + result = self._run_git("rev-parse", "HEAD") + if result.returncode != 0: + # No commits yet, create one + self._run_git("commit", "--allow-empty", "-m", "Initial commit") + result = self._run_git("rev-parse", "HEAD") + if result.returncode != 0: + logger.error("Failed to get HEAD ref") + return None + + base_ref = result.stdout.strip() + + # Create the worktree with a new branch + result = self._run_git( + "worktree", "add", + "-b", branch, + str(worktree_path), + base_ref + ) + + if result.returncode != 0: + # Branch might already exist, try without -b + result = self._run_git( + "worktree", "add", + str(worktree_path), + branch + ) + + if result.returncode != 0: + logger.error(f"Failed to create worktree: {result.stderr}") + return None + + logger.info(f"Created worktree for agent {agent_id} at {worktree_path}") + + # Copy features.db to worktree (symlink would be better but cross-platform issues) + self._setup_shared_resources(worktree_path) + + return worktree_path + + def _setup_shared_resources(self, worktree_path: Path) -> None: + """ + Set up shared resources in the worktree. + + The features.db is shared across all worktrees via symlink. + """ + # Create symlink to shared features.db + features_db = self.project_dir / "features.db" + worktree_db = worktree_path / "features.db" + + if features_db.exists() and not worktree_db.exists(): + try: + # Try symlink first (preferred) + worktree_db.symlink_to(features_db) + logger.debug(f"Created symlink to features.db in {worktree_path}") + except OSError: + # Fallback: use the same path in MCP server config + logger.debug("Symlink not supported, MCP server will use main project path") + + # Copy prompts directory if it exists + prompts_dir = self.project_dir / "prompts" + worktree_prompts = worktree_path / "prompts" + if prompts_dir.exists() and not worktree_prompts.exists(): + try: + worktree_prompts.symlink_to(prompts_dir) + except OSError: + import shutil + shutil.copytree(prompts_dir, worktree_prompts) + + def remove_worktree(self, agent_id: str) -> bool: + """ + Remove a worktree for an agent. + + Args: + agent_id: Unique identifier for the agent + + Returns: + True if successful, False otherwise + """ + worktree_path = self.get_worktree_path(agent_id) + + if not worktree_path.exists(): + return True + + # Remove the worktree + result = self._run_git("worktree", "remove", str(worktree_path), "--force") + + if result.returncode != 0: + logger.warning(f"Failed to remove worktree: {result.stderr}") + # Try manual removal + import shutil + try: + shutil.rmtree(worktree_path) + except Exception as e: + logger.error(f"Failed to manually remove worktree: {e}") + return False + + # Prune worktree references + self._run_git("worktree", "prune") + + logger.info(f"Removed worktree for agent {agent_id}") + return True + + def list_worktrees(self) -> list[dict]: + """ + List all active worktrees. + + Returns: + List of dicts with worktree info (path, branch, head) + """ + result = self._run_git("worktree", "list", "--porcelain") + + if result.returncode != 0: + return [] + + worktrees = [] + current = {} + + for line in result.stdout.strip().split("\n"): + if not line: + if current: + worktrees.append(current) + current = {} + continue + + if line.startswith("worktree "): + current["path"] = line[9:] + elif line.startswith("HEAD "): + current["head"] = line[5:] + elif line.startswith("branch "): + current["branch"] = line[7:] + elif line == "bare": + current["bare"] = True + elif line == "detached": + current["detached"] = True + + if current: + worktrees.append(current) + + return worktrees + + def merge_worktree_changes(self, agent_id: str, commit_message: Optional[str] = None) -> bool: + """ + Merge changes from an agent's worktree back to main branch. + + Args: + agent_id: Unique identifier for the agent + commit_message: Optional commit message + + Returns: + True if successful, False otherwise + """ + worktree_path = self.get_worktree_path(agent_id) + branch = f"agent-{agent_id}" + + if not worktree_path.exists(): + logger.error(f"Worktree for agent {agent_id} does not exist") + return False + + # Commit any uncommitted changes in the worktree + self._run_git("add", "-A", cwd=worktree_path) + msg = commit_message or f"Agent {agent_id} changes" + self._run_git("commit", "-m", msg, "--allow-empty", cwd=worktree_path) + + # Get current branch in main repo + result = self._run_git("branch", "--show-current") + main_branch = result.stdout.strip() or "main" + + # Merge the agent branch into main + result = self._run_git("merge", branch, "--no-edit") + + if result.returncode != 0: + logger.warning(f"Merge conflict for agent {agent_id}: {result.stderr}") + # Abort the merge + self._run_git("merge", "--abort") + return False + + logger.info(f"Merged changes from agent {agent_id}") + return True + + def cleanup_all_worktrees(self) -> None: + """Remove all worktrees and clean up.""" + worktrees = self.list_worktrees() + + for wt in worktrees: + path = Path(wt.get("path", "")) + if path != self.project_dir and str(self.worktrees_dir) in str(path): + # Extract agent_id from path + agent_id = path.name.replace("agent_", "") + self.remove_worktree(agent_id) + + # Clean up worktrees directory if empty + if self.worktrees_dir.exists(): + try: + self.worktrees_dir.rmdir() + except OSError: + pass # Directory not empty + + # Prune stale worktree references + self._run_git("worktree", "prune") From 52c0680fe9d9c4825108a6c5dc66ba5582cdbcb5 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Tue, 6 Jan 2026 17:02:42 -0500 Subject: [PATCH 2/7] feat: Implement parallel agent control in UI Add support for managing parallel agents in the UI, enhancing project execution capabilities. Key changes include: - Introduced `ParallelAgentControl` component for toggling parallel mode. - Updated `App` component to conditionally render agent controls based on parallel mode. - Added hooks for querying and mutating parallel agent statuses in `useProjects`. - Implemented API functions for starting, stopping, merging, and cleaning up parallel agents. This update improves the user experience by allowing simultaneous agent operations, streamlining project workflows. --- ui/src/App.tsx | 25 +- ui/src/components/ParallelAgentControl.tsx | 286 +++++++++++++++++++++ ui/src/hooks/useProjects.ts | 61 +++++ ui/src/lib/api.ts | 47 ++++ ui/tsconfig.tsbuildinfo | 2 +- 5 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 ui/src/components/ParallelAgentControl.tsx diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 96066b24..e015f0a0 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -8,6 +8,7 @@ const STORAGE_KEY = 'autonomous-coder-selected-project' import { ProjectSelector } from './components/ProjectSelector' import { KanbanBoard } from './components/KanbanBoard' import { AgentControl } from './components/AgentControl' +import { ParallelAgentControl } from './components/ParallelAgentControl' import { ProgressDashboard } from './components/ProgressDashboard' import { SetupWizard } from './components/SetupWizard' import { AddFeatureForm } from './components/AddFeatureForm' @@ -34,6 +35,7 @@ function App() { const [debugOpen, setDebugOpen] = useState(false) const [debugPanelHeight, setDebugPanelHeight] = useState(288) // Default height const [assistantOpen, setAssistantOpen] = useState(false) + const [parallelMode, setParallelMode] = useState(false) const { data: projects, isLoading: projectsLoading } = useProjects() const { data: features } = useFeatures(selectedProject) @@ -160,11 +162,24 @@ function App() { - + {!parallelMode ? ( + <> + + + + ) : ( + + )} )}
diff --git a/ui/src/components/ParallelAgentControl.tsx b/ui/src/components/ParallelAgentControl.tsx new file mode 100644 index 00000000..c188fa4f --- /dev/null +++ b/ui/src/components/ParallelAgentControl.tsx @@ -0,0 +1,286 @@ +import { useState } from 'react' +import { + Play, + Square, + Loader2, + Zap, + Users, + Bot, + GitMerge, + Trash2, + Minus, + Plus, +} from 'lucide-react' +import { + useParallelAgentsStatus, + useStartParallelAgents, + useStopParallelAgents, + useMergeParallelWorktrees, + useCleanupParallelAgents, +} from '../hooks/useProjects' +import type { ParallelAgentInfo } from '../lib/types' + +interface ParallelAgentControlProps { + projectName: string + onModeChange?: (isParallel: boolean) => void +} + +// Agent colors matching FeatureCard.tsx +const AGENT_COLORS = [ + '#00b4d8', // cyan - agent-1 + '#70e000', // green - agent-2 + '#8338ec', // purple - agent-3 + '#ff5400', // orange - agent-4 + '#ff006e', // pink - agent-5 +] + +function getAgentColor(agentId: string): string { + const match = agentId.match(/\d+/) + if (match) { + const num = parseInt(match[0], 10) - 1 + return AGENT_COLORS[num % AGENT_COLORS.length] + } + return AGENT_COLORS[0] +} + +export function ParallelAgentControl({ + projectName, + onModeChange, +}: ParallelAgentControlProps) { + const [numAgents, setNumAgents] = useState(2) + const [yoloEnabled, setYoloEnabled] = useState(false) + const [isParallelMode, setIsParallelMode] = useState(false) + + const { data: status, isLoading: statusLoading } = useParallelAgentsStatus( + isParallelMode ? projectName : null + ) + + const startAgents = useStartParallelAgents(projectName) + const stopAgents = useStopParallelAgents(projectName) + const mergeWorktrees = useMergeParallelWorktrees(projectName) + const cleanupAgents = useCleanupParallelAgents(projectName) + + const isLoading = + startAgents.isPending || + stopAgents.isPending || + mergeWorktrees.isPending || + cleanupAgents.isPending + + const hasRunningAgents = (status?.total_running ?? 0) > 0 + + const handleToggleMode = () => { + const newMode = !isParallelMode + setIsParallelMode(newMode) + onModeChange?.(newMode) + } + + const handleStart = () => { + startAgents.mutate({ numAgents, yoloMode: yoloEnabled }) + } + + const handleStop = () => { + stopAgents.mutate() + } + + const handleMerge = () => { + mergeWorktrees.mutate() + } + + const handleCleanup = () => { + cleanupAgents.mutate() + } + + if (!isParallelMode) { + return ( + + ) + } + + return ( +
+ {/* Header */} +
+
+ + + Parallel Agents + +
+ +
+ + {/* Agent Status Grid */} + {status && status.agents.length > 0 && ( +
+ {status.agents.map((agent) => ( + + ))} +
+ )} + + {/* Controls */} + {!hasRunningAgents ? ( +
+ {/* Agent Count Selector */} +
+ Agents: +
+ + {numAgents} + +
+ + {/* YOLO Toggle */} + +
+ + {/* Start Button */} + +
+ ) : ( +
+ {/* Running indicator */} +
+ + + {status?.total_running} agent{status?.total_running !== 1 ? 's' : ''} running + + {yoloEnabled && ( +
+ + YOLO +
+ )} +
+ + {/* Action Buttons */} +
+ + + +
+
+ )} + + {/* Status Loading */} + {statusLoading && !status && ( +
+ +
+ )} +
+ ) +} + +function AgentStatusBadge({ agent }: { agent: ParallelAgentInfo }) { + const color = getAgentColor(agent.agent_id) + const statusColors: Record = { + running: 'var(--color-neo-done)', + paused: 'var(--color-neo-pending)', + stopped: 'var(--color-neo-text-secondary)', + crashed: 'var(--color-neo-danger)', + unknown: 'var(--color-neo-text-secondary)', + } + + const statusColor = statusColors[agent.status] || statusColors.unknown + const agentNum = agent.agent_id.replace('agent-', '#') + + return ( +
+ + + {agentNum} + + +
+ ) +} diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index 0cb61fa5..19f1de5a 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -151,6 +151,67 @@ export function useResumeAgent(projectName: string) { }) } +// ============================================================================ +// Parallel Agents +// ============================================================================ + +export function useParallelAgentsStatus(projectName: string | null) { + return useQuery({ + queryKey: ['parallel-agents-status', projectName], + queryFn: () => api.getParallelAgentsStatus(projectName!), + enabled: !!projectName, + refetchInterval: 3000, // Poll every 3 seconds + }) +} + +export function useStartParallelAgents(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: ({ numAgents, yoloMode }: { numAgents: number; yoloMode?: boolean }) => + api.startParallelAgents(projectName, numAgents, yoloMode), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + +export function useStopParallelAgents(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.stopParallelAgents(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + +export function useMergeParallelWorktrees(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.mergeParallelWorktrees(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + }, + }) +} + +export function useCleanupParallelAgents(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.cleanupParallelAgents(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + // ============================================================================ // Setup // ============================================================================ diff --git a/ui/src/lib/api.ts b/ui/src/lib/api.ts index bfee6cc9..9e6685cd 100644 --- a/ui/src/lib/api.ts +++ b/ui/src/lib/api.ts @@ -16,6 +16,8 @@ import type { PathValidationResponse, AssistantConversation, AssistantConversationDetail, + ParallelAgentsStatus, + ParallelAgentActionResponse, } from './types' const API_BASE = '/api' @@ -147,6 +149,51 @@ export async function resumeAgent(projectName: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/status`) +} + +export async function startParallelAgents( + projectName: string, + numAgents: number = 2, + yoloMode: boolean = false +): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/start`, { + method: 'POST', + body: JSON.stringify({ num_agents: numAgents, yolo_mode: yoloMode }), + }) +} + +export async function stopParallelAgents( + projectName: string +): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/stop`, { + method: 'POST', + }) +} + +export async function mergeParallelWorktrees( + projectName: string +): Promise<{ success: boolean; agents: Record; message: string }> { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/merge`, { + method: 'POST', + }) +} + +export async function cleanupParallelAgents( + projectName: string +): Promise<{ success: boolean; message: string }> { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/cleanup`, { + method: 'POST', + }) +} + // ============================================================================ // Spec Creation API // ============================================================================ diff --git a/ui/tsconfig.tsbuildinfo b/ui/tsconfig.tsbuildinfo index 973c5f0f..ef4ee4eb 100644 --- a/ui/tsconfig.tsbuildinfo +++ b/ui/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/AddFeatureForm.tsx","./src/components/AgentControl.tsx","./src/components/AgentThought.tsx","./src/components/AssistantChat.tsx","./src/components/AssistantFAB.tsx","./src/components/AssistantPanel.tsx","./src/components/ChatMessage.tsx","./src/components/DebugLogViewer.tsx","./src/components/FeatureCard.tsx","./src/components/FeatureModal.tsx","./src/components/FolderBrowser.tsx","./src/components/KanbanBoard.tsx","./src/components/KanbanColumn.tsx","./src/components/NewProjectModal.tsx","./src/components/ProgressDashboard.tsx","./src/components/ProjectSelector.tsx","./src/components/QuestionOptions.tsx","./src/components/SetupWizard.tsx","./src/components/SpecCreationChat.tsx","./src/components/TypingIndicator.tsx","./src/hooks/useAssistantChat.ts","./src/hooks/useCelebration.ts","./src/hooks/useFeatureSound.ts","./src/hooks/useProjects.ts","./src/hooks/useSpecChat.ts","./src/hooks/useWebSocket.ts","./src/lib/api.ts","./src/lib/types.ts"],"version":"5.6.3"} \ No newline at end of file +{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/AddFeatureForm.tsx","./src/components/AgentControl.tsx","./src/components/AgentThought.tsx","./src/components/AssistantChat.tsx","./src/components/AssistantFAB.tsx","./src/components/AssistantPanel.tsx","./src/components/ChatMessage.tsx","./src/components/DebugLogViewer.tsx","./src/components/FeatureCard.tsx","./src/components/FeatureModal.tsx","./src/components/FolderBrowser.tsx","./src/components/KanbanBoard.tsx","./src/components/KanbanColumn.tsx","./src/components/NewProjectModal.tsx","./src/components/ParallelAgentControl.tsx","./src/components/ProgressDashboard.tsx","./src/components/ProjectSelector.tsx","./src/components/QuestionOptions.tsx","./src/components/SetupWizard.tsx","./src/components/SpecCreationChat.tsx","./src/components/TypingIndicator.tsx","./src/hooks/useAssistantChat.ts","./src/hooks/useCelebration.ts","./src/hooks/useFeatureSound.ts","./src/hooks/useProjects.ts","./src/hooks/useSpecChat.ts","./src/hooks/useWebSocket.ts","./src/lib/api.ts","./src/lib/types.ts"],"version":"5.6.3"} \ No newline at end of file From 75a99f28bd4d3437332f86d9d556a24f009712b9 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Wed, 7 Jan 2026 16:17:26 -0500 Subject: [PATCH 3/7] fix: Address CodeRabbitAI review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix race condition in feature_claim_next by using AND logic instead of OR - Remove unnecessary empty-string checks from filter conditions - Make agent startup concurrent with asyncio.gather() for faster init - Add docstrings warning about SIGSTOP/SIGCONT limitations with asyncio - Fix AttributeError when accessing non-existent agent status - Improve git init error handling (return False on add/commit failure) - Better symlink failure handling with marker file fallback - Fix merge to checkout target branch before merging - Add *.tsbuildinfo to .gitignore and remove from tracking 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .gitignore | 1 + mcp_server/feature_mcp.py | 8 +++--- parallel_agents.py | 41 ++++++++++++++++++++++----- server/routers/parallel_agents.py | 3 +- ui/tsconfig.tsbuildinfo | 1 - worktree.py | 47 ++++++++++++++++++++++++------- 6 files changed, 78 insertions(+), 23 deletions(-) delete mode 100644 ui/tsconfig.tsbuildinfo diff --git a/.gitignore b/.gitignore index d14182cd..d5b4c9f1 100644 --- a/.gitignore +++ b/.gitignore @@ -98,6 +98,7 @@ Desktop.ini ui/dist/ ui/.vite/ .vite/ +*.tsbuildinfo # =================== # Environment files diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 6d62a900..91551aae 100644 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -166,7 +166,6 @@ def feature_get_next( if agent_id: query = query.filter( (Feature.assigned_agent_id == None) | - (Feature.assigned_agent_id == "") | (Feature.assigned_agent_id == agent_id) ) @@ -397,13 +396,14 @@ def feature_claim_next( session = get_session() try: # Find the next available feature not assigned to another agent + # A feature is available if: + # 1. Not in progress AND not assigned to anyone, OR + # 2. Already assigned to this agent (allow re-claiming own feature) feature = ( session.query(Feature) .filter(Feature.passes == False) .filter( - (Feature.in_progress == False) | - (Feature.assigned_agent_id == None) | - (Feature.assigned_agent_id == "") | + ((Feature.in_progress == False) & (Feature.assigned_agent_id == None)) | (Feature.assigned_agent_id == agent_id) ) .order_by(Feature.priority.asc(), Feature.id.asc()) diff --git a/parallel_agents.py b/parallel_agents.py index 780f1f7a..fb437f11 100644 --- a/parallel_agents.py +++ b/parallel_agents.py @@ -123,11 +123,19 @@ async def start_agents( logger.error("Failed to initialize git repository") return {self.generate_agent_id(i): False for i in range(num_agents)} - # Start agents - for i in range(num_agents): - agent_id = self.generate_agent_id(i) - success = await self.start_agent(agent_id, yolo_mode, model) - results[agent_id] = success + # Start agents concurrently for faster initialization + agent_ids = [self.generate_agent_id(i) for i in range(num_agents)] + tasks = [self.start_agent(agent_id, yolo_mode, model) for agent_id in agent_ids] + + # Gather results, allowing individual failures + start_results = await asyncio.gather(*tasks, return_exceptions=True) + + for agent_id, result in zip(agent_ids, start_results): + if isinstance(result, Exception): + logger.error(f"Failed to start {agent_id}: {result}") + results[agent_id] = False + else: + results[agent_id] = result return results @@ -283,7 +291,18 @@ async def stop_all_agents(self) -> None: await asyncio.gather(*tasks, return_exceptions=True) async def pause_agent(self, agent_id: str) -> bool: - """Pause an agent using psutil.""" + """ + Pause an agent using psutil suspend (SIGSTOP on Unix). + + WARNING: This uses SIGSTOP/SIGCONT which has known limitations: + - The entire process and asyncio event loop are frozen + - On resume, pending signals, timeouts, and callbacks fire in a burst + - Outstanding async operations (API requests, I/O) may timeout or fail + - Child watcher and signal handling may have unpredictable behavior + + For production use, consider stop/start semantics instead if the agent + has long-running async operations that could be interrupted. + """ agent = self.agents.get(agent_id) if not agent or not agent.process or agent.status != "running": return False @@ -299,7 +318,15 @@ async def pause_agent(self, agent_id: str) -> bool: return False async def resume_agent(self, agent_id: str) -> bool: - """Resume a paused agent.""" + """ + Resume a paused agent using psutil resume (SIGCONT on Unix). + + WARNING: See pause_agent docstring for important caveats about + SIGSTOP/SIGCONT and asyncio. After resume, the agent may experience: + - Burst of delayed timer/callback executions + - Potential connection timeouts or stale state + - Signal delivery ordering issues + """ agent = self.agents.get(agent_id) if not agent or not agent.process or agent.status != "paused": return False diff --git a/server/routers/parallel_agents.py b/server/routers/parallel_agents.py index adbf3901..dd3f532a 100644 --- a/server/routers/parallel_agents.py +++ b/server/routers/parallel_agents.py @@ -148,10 +148,11 @@ async def start_single_agent( success = await orchestrator.start_agent(agent_id, yolo_mode=yolo_mode) + agent = orchestrator.agents.get(agent_id) return { "success": success, "agent_id": agent_id, - "status": orchestrator.agents.get(agent_id, {}).status if agent_id in orchestrator.agents else "unknown", + "status": agent.status if agent else "unknown", } diff --git a/ui/tsconfig.tsbuildinfo b/ui/tsconfig.tsbuildinfo deleted file mode 100644 index ef4ee4eb..00000000 --- a/ui/tsconfig.tsbuildinfo +++ /dev/null @@ -1 +0,0 @@ -{"root":["./src/App.tsx","./src/main.tsx","./src/vite-env.d.ts","./src/components/AddFeatureForm.tsx","./src/components/AgentControl.tsx","./src/components/AgentThought.tsx","./src/components/AssistantChat.tsx","./src/components/AssistantFAB.tsx","./src/components/AssistantPanel.tsx","./src/components/ChatMessage.tsx","./src/components/DebugLogViewer.tsx","./src/components/FeatureCard.tsx","./src/components/FeatureModal.tsx","./src/components/FolderBrowser.tsx","./src/components/KanbanBoard.tsx","./src/components/KanbanColumn.tsx","./src/components/NewProjectModal.tsx","./src/components/ParallelAgentControl.tsx","./src/components/ProgressDashboard.tsx","./src/components/ProjectSelector.tsx","./src/components/QuestionOptions.tsx","./src/components/SetupWizard.tsx","./src/components/SpecCreationChat.tsx","./src/components/TypingIndicator.tsx","./src/hooks/useAssistantChat.ts","./src/hooks/useCelebration.ts","./src/hooks/useFeatureSound.ts","./src/hooks/useProjects.ts","./src/hooks/useSpecChat.ts","./src/hooks/useWebSocket.ts","./src/lib/api.ts","./src/lib/types.ts"],"version":"5.6.3"} \ No newline at end of file diff --git a/worktree.py b/worktree.py index 490850e7..f5ed3d3e 100644 --- a/worktree.py +++ b/worktree.py @@ -68,11 +68,13 @@ def ensure_git_repo(self) -> bool: # Create initial commit so worktrees can branch from it result = self._run_git("add", "-A") if result.returncode != 0: - logger.warning(f"Git add failed: {result.stderr}") + logger.error(f"Git add failed: {result.stderr}") + return False result = self._run_git("commit", "-m", "Initial commit for parallel agents", "--allow-empty") if result.returncode != 0: - logger.warning(f"Git commit failed: {result.stderr}") + logger.error(f"Git commit failed: {result.stderr}") + return False return True @@ -149,6 +151,10 @@ def _setup_shared_resources(self, worktree_path: Path) -> None: Set up shared resources in the worktree. The features.db is shared across all worktrees via symlink. + This is critical for atomic feature claiming via feature_claim_next(). + + Raises: + RuntimeError: If features.db cannot be shared with the worktree. """ # Create symlink to shared features.db features_db = self.project_dir / "features.db" @@ -159,9 +165,18 @@ def _setup_shared_resources(self, worktree_path: Path) -> None: # Try symlink first (preferred) worktree_db.symlink_to(features_db) logger.debug(f"Created symlink to features.db in {worktree_path}") - except OSError: - # Fallback: use the same path in MCP server config - logger.debug("Symlink not supported, MCP server will use main project path") + except OSError as e: + # Symlink failed - this breaks shared DB coordination + # The MCP server uses PROJECT_DIR env var which points to worktree, + # so we need the features.db to be accessible there + logger.warning( + f"Symlink creation failed ({e}). " + f"Ensure MCP server is configured to use {features_db} directly." + ) + # Write a marker file to indicate the absolute DB path + db_path_file = worktree_path / ".features_db_path" + db_path_file.write_text(str(features_db)) + logger.info(f"Wrote features.db path to {db_path_file}") # Copy prompts directory if it exists prompts_dir = self.project_dir / "prompts" @@ -268,16 +283,28 @@ def merge_worktree_changes(self, agent_id: str, commit_message: Optional[str] = msg = commit_message or f"Agent {agent_id} changes" self._run_git("commit", "-m", msg, "--allow-empty", cwd=worktree_path) - # Get current branch in main repo + # Get the target branch to merge into (default to "main") result = self._run_git("branch", "--show-current") - main_branch = result.stdout.strip() or "main" + target_branch = result.stdout.strip() or "main" - # Merge the agent branch into main + # Ensure we're on the target branch before merging + result = self._run_git("checkout", target_branch) + if result.returncode != 0: + logger.error(f"Failed to checkout {target_branch}: {result.stderr}") + return False + + # Merge the agent branch into the target branch result = self._run_git("merge", branch, "--no-edit") if result.returncode != 0: - logger.warning(f"Merge conflict for agent {agent_id}: {result.stderr}") - # Abort the merge + logger.warning( + f"Merge conflict for agent {agent_id}. " + f"Agent branch: {branch}. Error: {result.stderr}" + ) + logger.warning( + f"Manual resolution required. Worktree at: {worktree_path}" + ) + # Abort the merge to clean up self._run_git("merge", "--abort") return False From c90c769375051ce95a6e01bdc55c16d6d4204ab6 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Wed, 7 Jan 2026 16:39:59 -0500 Subject: [PATCH 4/7] fix: Address additional CodeRabbitAI review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use Path.is_relative_to() instead of string matching in worktree cleanup to prevent false positives from substring matches - Add missing docstrings to improve coverage above 80% threshold: - run_parallel() in autonomous_agent_demo.py - read_file(), write_file() in server/routers/projects.py - ConnectionManager.__init__() in server/websocket.py - status getter/setter, pid property in process_manager.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- autonomous_agent_demo.py | 1 + server/routers/projects.py | 2 ++ server/services/process_manager.py | 3 +++ server/websocket.py | 1 + worktree.py | 4 +++- 5 files changed, 10 insertions(+), 1 deletion(-) diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index 2e3a2f32..23343552 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -161,6 +161,7 @@ def main() -> None: ) async def run_parallel(): + """Run multiple agents in parallel and wait for completion.""" try: results = await orchestrator.start_agents( num_agents=num_agents, diff --git a/server/routers/projects.py b/server/routers/projects.py index d1c2b6c7..69b7d12c 100644 --- a/server/routers/projects.py +++ b/server/routers/projects.py @@ -277,6 +277,7 @@ async def get_project_prompts(name: str): prompts_dir = _get_project_prompts_dir(project_dir) def read_file(filename: str) -> str: + """Read a prompt file, returning empty string if not found.""" filepath = prompts_dir / filename if filepath.exists(): try: @@ -311,6 +312,7 @@ async def update_project_prompts(name: str, prompts: ProjectPromptsUpdate): prompts_dir.mkdir(parents=True, exist_ok=True) def write_file(filename: str, content: str | None): + """Write content to a prompt file if content is not None.""" if content is not None: filepath = prompts_dir / filename filepath.write_text(content, encoding="utf-8") diff --git a/server/services/process_manager.py b/server/services/process_manager.py index d2b4f0b3..8adade13 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -85,10 +85,12 @@ def __init__( @property def status(self) -> Literal["stopped", "running", "paused", "crashed"]: + """Get the current agent process status.""" return self._status @status.setter def status(self, value: Literal["stopped", "running", "paused", "crashed"]): + """Set status and notify callbacks if changed.""" old_status = self._status self._status = value if old_status != value: @@ -137,6 +139,7 @@ def remove_status_callback(self, callback: Callable[[str], Awaitable[None]]) -> @property def pid(self) -> int | None: + """Get the process ID of the running agent, or None if not running.""" return self.process.pid if self.process else None def _check_lock(self) -> bool: diff --git a/server/websocket.py b/server/websocket.py index 23139009..9437b477 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -51,6 +51,7 @@ class ConnectionManager: """Manages WebSocket connections per project.""" def __init__(self): + """Initialize the connection manager with empty connection registry.""" # project_name -> set of WebSocket connections self.active_connections: dict[str, Set[WebSocket]] = {} self._lock = asyncio.Lock() diff --git a/worktree.py b/worktree.py index f5ed3d3e..a41a4891 100644 --- a/worktree.py +++ b/worktree.py @@ -317,7 +317,9 @@ def cleanup_all_worktrees(self) -> None: for wt in worktrees: path = Path(wt.get("path", "")) - if path != self.project_dir and str(self.worktrees_dir) in str(path): + # Use is_relative_to() for robust path matching instead of string matching + # This avoids false positives from substring matches + if path != self.project_dir and path.is_relative_to(self.worktrees_dir): # Extract agent_id from path agent_id = path.name.replace("agent_", "") self.remove_worktree(agent_id) From 8372a370388904a9aa94499000dc9e9f494a7948 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Wed, 7 Jan 2026 17:27:46 -0500 Subject: [PATCH 5/7] fix: Address remaining CodeRabbitAI review comments - Add max_iterations parameter to parallel agent system (was ignored) - Improve health check to distinguish completion vs crash - Check fallback commit return code in worktree creation - Clarify symlink fallback behavior (env var is the actual solution) Co-Authored-By: Claude Opus 4.5 --- autonomous_agent_demo.py | 10 ++++++++++ parallel_agents.py | 9 ++++++++- server/routers/parallel_agents.py | 1 + server/schemas.py | 1 + worktree.py | 25 +++++++++++++++---------- 5 files changed, 35 insertions(+), 11 deletions(-) diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index 23343552..fa88afea 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -167,6 +167,7 @@ async def run_parallel(): num_agents=num_agents, yolo_mode=args.yolo, model=args.model, + max_iterations=args.max_iterations, ) print(f"\nStarted agents: {results}") @@ -175,7 +176,16 @@ async def run_parallel(): health = await orchestrator.healthcheck() running = sum(1 for v in health.values() if v) if running == 0: + # Distinguish between completion and crashes + statuses = orchestrator.get_all_statuses() + crashed = [s["agent_id"] for s in statuses if s["status"] == "crashed"] + stopped = [s["agent_id"] for s in statuses if s["status"] == "stopped"] + print("\nAll agents have finished.") + if crashed: + print(f" Crashed: {', '.join(crashed)}") + if stopped: + print(f" Completed: {', '.join(stopped)}") break await asyncio.sleep(5) diff --git a/parallel_agents.py b/parallel_agents.py index fb437f11..e20a7a6c 100644 --- a/parallel_agents.py +++ b/parallel_agents.py @@ -103,6 +103,7 @@ async def start_agents( num_agents: int, yolo_mode: bool = False, model: str = "claude-sonnet-4-5-20250929", + max_iterations: Optional[int] = None, ) -> dict[str, bool]: """ Start multiple agents in parallel. @@ -111,6 +112,7 @@ async def start_agents( num_agents: Number of agents to start (capped at max_agents) yolo_mode: Enable YOLO mode (no browser testing) model: Claude model to use + max_iterations: Maximum iterations per agent (default: unlimited) Returns: Dict mapping agent_id to success status @@ -125,7 +127,7 @@ async def start_agents( # Start agents concurrently for faster initialization agent_ids = [self.generate_agent_id(i) for i in range(num_agents)] - tasks = [self.start_agent(agent_id, yolo_mode, model) for agent_id in agent_ids] + tasks = [self.start_agent(agent_id, yolo_mode, model, max_iterations) for agent_id in agent_ids] # Gather results, allowing individual failures start_results = await asyncio.gather(*tasks, return_exceptions=True) @@ -144,6 +146,7 @@ async def start_agent( agent_id: str, yolo_mode: bool = False, model: str = "claude-sonnet-4-5-20250929", + max_iterations: Optional[int] = None, ) -> bool: """ Start a single agent. @@ -152,6 +155,7 @@ async def start_agent( agent_id: Unique agent identifier yolo_mode: Enable YOLO mode model: Claude model to use + max_iterations: Maximum iterations (default: unlimited) Returns: True if started successfully @@ -179,6 +183,9 @@ async def start_agent( if yolo_mode: cmd.append("--yolo") + if max_iterations is not None: + cmd.extend(["--max-iterations", str(max_iterations)]) + try: process = subprocess.Popen( cmd, diff --git a/server/routers/parallel_agents.py b/server/routers/parallel_agents.py index dd3f532a..e8d7ac61 100644 --- a/server/routers/parallel_agents.py +++ b/server/routers/parallel_agents.py @@ -102,6 +102,7 @@ async def start_parallel_agents( results = await orchestrator.start_agents( num_agents=request.num_agents, yolo_mode=request.yolo_mode, + max_iterations=request.max_iterations, ) all_success = all(results.values()) diff --git a/server/schemas.py b/server/schemas.py index c2ce6b2b..0ea685fc 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -129,6 +129,7 @@ class ParallelAgentStartRequest(BaseModel): """Request schema for starting parallel agents.""" num_agents: int = Field(default=2, ge=1, le=10, description="Number of agents to start") yolo_mode: bool = False + max_iterations: int | None = Field(default=None, ge=1, description="Maximum iterations per agent") class ParallelAgentInfo(BaseModel): diff --git a/worktree.py b/worktree.py index a41a4891..bb1e53a9 100644 --- a/worktree.py +++ b/worktree.py @@ -111,10 +111,13 @@ def create_worktree(self, agent_id: str, branch_name: Optional[str] = None) -> O result = self._run_git("rev-parse", "HEAD") if result.returncode != 0: # No commits yet, create one - self._run_git("commit", "--allow-empty", "-m", "Initial commit") + commit_result = self._run_git("commit", "--allow-empty", "-m", "Initial commit") + if commit_result.returncode != 0: + logger.error(f"Failed to create initial commit: {commit_result.stderr}") + return None result = self._run_git("rev-parse", "HEAD") if result.returncode != 0: - logger.error("Failed to get HEAD ref") + logger.error(f"Failed to get HEAD ref after initial commit: {result.stderr}") return None base_ref = result.stdout.strip() @@ -166,17 +169,19 @@ def _setup_shared_resources(self, worktree_path: Path) -> None: worktree_db.symlink_to(features_db) logger.debug(f"Created symlink to features.db in {worktree_path}") except OSError as e: - # Symlink failed - this breaks shared DB coordination - # The MCP server uses PROJECT_DIR env var which points to worktree, - # so we need the features.db to be accessible there - logger.warning( - f"Symlink creation failed ({e}). " - f"Ensure MCP server is configured to use {features_db} directly." + # Symlink failed (common on Windows without admin privileges) + # This is OK because parallel_agent_runner.py sets PROJECT_DIR + # environment variable to the main project directory, and the + # MCP server uses that env var to locate features.db + logger.info( + f"Symlink creation failed ({e}), but this is OK - " + f"MCP server uses PROJECT_DIR env var to access {features_db}" ) - # Write a marker file to indicate the absolute DB path + # Write a marker file for debugging purposes only + # (not read by MCP server, just useful for troubleshooting) db_path_file = worktree_path / ".features_db_path" db_path_file.write_text(str(features_db)) - logger.info(f"Wrote features.db path to {db_path_file}") + logger.debug(f"Wrote features.db path marker to {db_path_file}") # Copy prompts directory if it exists prompts_dir = self.project_dir / "prompts" From 8cf2029c88f77b49b4cde1610dfe05799e90f947 Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Wed, 7 Jan 2026 23:14:52 -0500 Subject: [PATCH 6/7] feat: Improve parallel agents with atomic claiming and auto-merge - Add parallel agent mode instructions to coding prompt template - Update prompt to use feature_claim_next(agent_id) for atomic claiming - Add worktree-safe auto-merge instructions after each feature - Increase max agents from 3 to 10 (backend and UI) - Add colors for agents 6-10 in FeatureCard and ParallelAgentControl - Fix React component identity bug causing double-click on Parallel button Co-Authored-By: Claude Opus 4.5 --- .claude/templates/coding_prompt.template.md | 67 ++++++++++++++++++++- parallel_agents.py | 2 +- ui/src/App.tsx | 24 +++----- ui/src/components/FeatureCard.tsx | 5 ++ ui/src/components/ParallelAgentControl.tsx | 11 +++- 5 files changed, 88 insertions(+), 21 deletions(-) diff --git a/.claude/templates/coding_prompt.template.md b/.claude/templates/coding_prompt.template.md index 6da10a2f..c48680b4 100644 --- a/.claude/templates/coding_prompt.template.md +++ b/.claude/templates/coding_prompt.template.md @@ -3,6 +3,20 @@ You are continuing work on a long-running autonomous development task. This is a FRESH context window - you have no memory of previous sessions. +### PARALLEL AGENT MODE + +Check if you're running as a parallel agent: + +```bash +echo "AGENT_ID: $AGENT_ID" +``` + +**If AGENT_ID is set (e.g., "agent-1", "agent-2"):** +- You are one of multiple agents working in parallel on this project +- You MUST use `feature_claim_next` instead of `feature_get_next` to avoid conflicts +- Other agents are working in separate git worktrees on the same codebase +- The features database is shared - atomic claiming prevents race conditions + ### STEP 1: GET YOUR BEARINGS (MANDATORY) Start by orienting yourself: @@ -96,6 +110,17 @@ Features are **test cases** that drive development. This is test-driven developm Get the next feature to implement: +**If running in PARALLEL MODE (AGENT_ID is set):** + +``` +# Atomically claim the next feature (prevents race conditions with other agents) +Use the feature_claim_next tool with agent_id=$AGENT_ID +``` + +This single call gets AND claims the feature atomically - no need to call `feature_mark_in_progress` separately. + +**If running in SINGLE AGENT MODE (AGENT_ID is not set):** + ``` # Get the highest-priority pending feature Use the feature_get_next tool @@ -273,7 +298,7 @@ Use the feature_mark_passing tool with feature_id=42 **ONLY MARK A FEATURE AS PASSING AFTER VERIFICATION WITH SCREENSHOTS.** -### STEP 8: COMMIT YOUR PROGRESS +### STEP 8: COMMIT AND MERGE YOUR PROGRESS Make a descriptive git commit: @@ -284,10 +309,41 @@ git commit -m "Implement [feature name] - verified end-to-end - Added [specific changes] - Tested with browser automation - Marked feature #X as passing -- Screenshots in verification/ directory " ``` +**If running in PARALLEL MODE (AGENT_ID is set), merge your changes immediately:** + +After each completed feature, integrate your changes so other agents can benefit. + +**IMPORTANT:** You're in a git worktree, so you cannot checkout the main branch directly. +Use this worktree-safe approach: + +```bash +# 1. Determine the main branch name +MAIN_BRANCH=$(cd "$(git rev-parse --git-common-dir)/.." && git branch --show-current 2>/dev/null || echo "main") + +# 2. Pull latest main into your branch (get others' changes first) +git fetch origin $MAIN_BRANCH 2>/dev/null || true +git merge origin/$MAIN_BRANCH --no-edit 2>/dev/null || git merge FETCH_HEAD --no-edit 2>/dev/null || true + +# 3. Push your branch to main (worktree-safe merge) +git push . HEAD:$MAIN_BRANCH +``` + +The `git push . HEAD:main` command merges your current branch into main without needing to checkout main. + +**If merge conflicts occur:** +- On step 2 (pulling main): Resolve conflicts, then `git add . && git commit -m "Merge main"` +- On step 3 (pushing to main): This means main has changes not in your branch + - Run step 2 again to get those changes, then retry step 3 +- If complex conflicts: Skip merging for now, continue working. Use UI "Merge" button later. + +**Why merge after each feature?** +- Other agents immediately get your improvements +- Prevents large conflict pile-ups at the end +- Each agent stays closer to the "true" state of the project + ### STEP 9: UPDATE PROGRESS NOTES Update `claude-progress.txt` with: @@ -369,9 +425,13 @@ The feature tools exist to reduce token usage. **DO NOT make exploratory queries feature_get_stats # 2. Get the NEXT feature to work on (one feature only) +# - SINGLE AGENT MODE: Use feature_get_next +# - PARALLEL MODE: Use feature_claim_next with agent_id=$AGENT_ID (atomic claim) feature_get_next +feature_claim_next with agent_id={agent_id} # PARALLEL MODE ONLY # 3. Mark a feature as in-progress (call immediately after feature_get_next) +# NOTE: Skip this if you used feature_claim_next (it claims automatically) feature_mark_in_progress with feature_id={id} # 4. Get up to 3 random passing features for regression testing @@ -385,6 +445,9 @@ feature_skip with feature_id={id} # 7. Clear in-progress status (when abandoning a feature) feature_clear_in_progress with feature_id={id} + +# 8. Release a feature back to the queue (PARALLEL MODE - if you can't complete it) +feature_release with feature_id={id} agent_id={agent_id} ``` ### RULES: diff --git a/parallel_agents.py b/parallel_agents.py index e20a7a6c..079aaa20 100644 --- a/parallel_agents.py +++ b/parallel_agents.py @@ -48,7 +48,7 @@ def __init__( self, project_dir: Path, root_dir: Path, - max_agents: int = 3, + max_agents: int = 10, ): """ Initialize the orchestrator. diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 2d85a108..3ac7614d 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -162,24 +162,18 @@ function App() { - {!parallelMode ? ( - <> - - - - ) : ( - )} + )} diff --git a/ui/src/components/FeatureCard.tsx b/ui/src/components/FeatureCard.tsx index 734d04c2..69b6c3ef 100644 --- a/ui/src/components/FeatureCard.tsx +++ b/ui/src/components/FeatureCard.tsx @@ -35,6 +35,11 @@ function getAgentColor(agentId: string): string { '#8338ec', // purple - agent-3 '#ff5400', // orange - agent-4 '#ff006e', // pink - agent-5 + '#3a86ff', // blue - agent-6 + '#ffd60a', // yellow - agent-7 + '#06d6a0', // teal - agent-8 + '#ef476f', // red - agent-9 + '#118ab2', // dark cyan - agent-10 ] // Extract number from agent-N format diff --git a/ui/src/components/ParallelAgentControl.tsx b/ui/src/components/ParallelAgentControl.tsx index c188fa4f..10c598a4 100644 --- a/ui/src/components/ParallelAgentControl.tsx +++ b/ui/src/components/ParallelAgentControl.tsx @@ -32,6 +32,11 @@ const AGENT_COLORS = [ '#8338ec', // purple - agent-3 '#ff5400', // orange - agent-4 '#ff006e', // pink - agent-5 + '#3a86ff', // blue - agent-6 + '#ffd60a', // yellow - agent-7 + '#06d6a0', // teal - agent-8 + '#ef476f', // red - agent-9 + '#118ab2', // dark cyan - agent-10 ] function getAgentColor(agentId: string): string { @@ -123,7 +128,7 @@ export function ParallelAgentControl({ {/* Agent Status Grid */} {status && status.agents.length > 0 && ( -
+
{status.agents.map((agent) => ( ))} @@ -146,9 +151,9 @@ export function ParallelAgentControl({ {numAgents} From c9ed9601c5be75c28b8eecdd084db1ae161a01bd Mon Sep 17 00:00:00 2001 From: Ofer Shaal Date: Wed, 7 Jan 2026 23:47:47 -0500 Subject: [PATCH 7/7] fix: Display agent badges on feature cards and add pause/resume for parallel agents - Fix assigned_agent_id not being returned in features API response - Add pause/resume endpoints for parallel agents (/parallel-agents/pause, /resume) - MCP server auto-claims features when AGENT_ID env var is set - UI shows which agent is working on each feature with colored badges - ParallelAgentControl panel now includes status indicator, YOLO toggle, and pause/resume/stop controls Co-Authored-By: Claude Opus 4.5 --- mcp_server/feature_mcp.py | 15 ++ server/routers/features.py | 1 + server/routers/parallel_agents.py | 48 ++++++ ui/src/components/ParallelAgentControl.tsx | 182 ++++++++++++++++----- ui/src/hooks/useProjects.ts | 24 +++ ui/src/lib/api.ts | 16 ++ 6 files changed, 241 insertions(+), 45 deletions(-) diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 91551aae..ef0ce903 100644 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -151,13 +151,21 @@ def feature_get_next( and is not currently being worked on by another agent. Use this at the start of each coding session to determine what to implement next. + NOTE: In parallel agent mode (AGENT_ID env var set), this automatically claims + the feature to prevent race conditions with other agents. + Args: agent_id: Optional agent ID. If provided, excludes features assigned to other agents. + Auto-detected from AGENT_ID environment variable if not provided. Returns: JSON with feature details (id, priority, category, name, description, steps, passes, in_progress, assigned_agent_id) or error message if all features are passing or assigned. """ + # Auto-detect agent_id from environment if not provided + if not agent_id: + agent_id = os.environ.get("AGENT_ID", "") + session = get_session() try: query = session.query(Feature).filter(Feature.passes == False) @@ -174,6 +182,13 @@ def feature_get_next( if feature is None: return json.dumps({"error": "All features are passing or assigned to other agents! No more work to do."}) + # In parallel mode, automatically claim the feature to prevent race conditions + if agent_id and not feature.in_progress: + feature.in_progress = True + feature.assigned_agent_id = agent_id + session.commit() + session.refresh(feature) + return json.dumps(feature.to_dict(), indent=2) finally: session.close() diff --git a/server/routers/features.py b/server/routers/features.py index 3329a68f..0d463487 100644 --- a/server/routers/features.py +++ b/server/routers/features.py @@ -90,6 +90,7 @@ def feature_to_response(f) -> FeatureResponse: steps=f.steps if isinstance(f.steps, list) else [], passes=f.passes, in_progress=f.in_progress, + assigned_agent_id=f.assigned_agent_id, ) diff --git a/server/routers/parallel_agents.py b/server/routers/parallel_agents.py index e8d7ac61..899356af 100644 --- a/server/routers/parallel_agents.py +++ b/server/routers/parallel_agents.py @@ -138,6 +138,54 @@ async def stop_all_parallel_agents(project_name: str): ) +@router.post("/pause", response_model=ParallelAgentActionResponse) +async def pause_all_parallel_agents(project_name: str): + """Pause all running parallel agents for a project.""" + orchestrator = get_orchestrator(project_name) + + running_agents = [ + aid for aid, agent in orchestrator.agents.items() + if agent.status == "running" + ] + + results = {} + for aid in running_agents: + success = await orchestrator.pause_agent(aid) + results[aid] = success + + paused_count = sum(1 for v in results.values() if v) + + return ParallelAgentActionResponse( + success=paused_count == len(running_agents), + agents=results, + message=f"Paused {paused_count}/{len(running_agents)} agents", + ) + + +@router.post("/resume", response_model=ParallelAgentActionResponse) +async def resume_all_parallel_agents(project_name: str): + """Resume all paused parallel agents for a project.""" + orchestrator = get_orchestrator(project_name) + + paused_agents = [ + aid for aid, agent in orchestrator.agents.items() + if agent.status == "paused" + ] + + results = {} + for aid in paused_agents: + success = await orchestrator.resume_agent(aid) + results[aid] = success + + resumed_count = sum(1 for v in results.values() if v) + + return ParallelAgentActionResponse( + success=resumed_count == len(paused_agents), + agents=results, + message=f"Resumed {resumed_count}/{len(paused_agents)} agents", + ) + + @router.post("/{agent_id}/start") async def start_single_agent( project_name: str, diff --git a/ui/src/components/ParallelAgentControl.tsx b/ui/src/components/ParallelAgentControl.tsx index 10c598a4..7691fcb0 100644 --- a/ui/src/components/ParallelAgentControl.tsx +++ b/ui/src/components/ParallelAgentControl.tsx @@ -1,12 +1,12 @@ import { useState } from 'react' import { Play, + Pause, Square, Loader2, Zap, Users, Bot, - GitMerge, Trash2, Minus, Plus, @@ -15,7 +15,8 @@ import { useParallelAgentsStatus, useStartParallelAgents, useStopParallelAgents, - useMergeParallelWorktrees, + usePauseParallelAgents, + useResumeParallelAgents, useCleanupParallelAgents, } from '../hooks/useProjects' import type { ParallelAgentInfo } from '../lib/types' @@ -62,16 +63,27 @@ export function ParallelAgentControl({ const startAgents = useStartParallelAgents(projectName) const stopAgents = useStopParallelAgents(projectName) - const mergeWorktrees = useMergeParallelWorktrees(projectName) + const pauseAgents = usePauseParallelAgents(projectName) + const resumeAgents = useResumeParallelAgents(projectName) const cleanupAgents = useCleanupParallelAgents(projectName) const isLoading = startAgents.isPending || stopAgents.isPending || - mergeWorktrees.isPending || + pauseAgents.isPending || + resumeAgents.isPending || cleanupAgents.isPending - const hasRunningAgents = (status?.total_running ?? 0) > 0 + const runningCount = status?.agents.filter(a => a.status === 'running').length ?? 0 + const pausedCount = status?.agents.filter(a => a.status === 'paused').length ?? 0 + const hasAgents = (status?.agents.length ?? 0) > 0 + const hasRunningAgents = runningCount > 0 + const hasPausedAgents = pausedCount > 0 + const allPaused = hasAgents && pausedCount === status?.agents.length + const allStopped = hasAgents && runningCount === 0 && pausedCount === 0 + + // Show start controls when no agents exist OR all agents are stopped + const showStartControls = !hasAgents || allStopped const handleToggleMode = () => { const newMode = !isParallelMode @@ -87,14 +99,19 @@ export function ParallelAgentControl({ stopAgents.mutate() } - const handleMerge = () => { - mergeWorktrees.mutate() + const handlePause = () => { + pauseAgents.mutate() + } + + const handleResume = () => { + resumeAgents.mutate() } const handleCleanup = () => { cleanupAgents.mutate() } + // Collapsed state - just show button if (!isParallelMode) { return (
@@ -177,36 +217,53 @@ export function ParallelAgentControl({ disabled={isLoading} className="neo-btn neo-btn-success text-sm py-2 px-4 flex items-center justify-center gap-2" > - {isLoading ? ( + {startAgents.isPending ? ( ) : ( <> - Start {numAgents} Agents + Start {numAgents} Agent{numAgents !== 1 ? 's' : ''} )}
) : ( + // Has agents - show control buttons
- {/* Running indicator */} -
- - - {status?.total_running} agent{status?.total_running !== 1 ? 's' : ''} running - - {yoloEnabled && ( -
- - YOLO -
- )} -
- - {/* Action Buttons */} + {/* Primary Action Buttons */}
+ {/* Pause/Resume Button */} + {hasRunningAgents && !allPaused ? ( + + ) : hasPausedAgents ? ( + + ) : null} + + {/* Stop Button */} - +
+ + {/* Secondary Action Button */} +
@@ -259,6 +308,49 @@ export function ParallelAgentControl({ ) } +function StatusIndicator({ + runningCount, + pausedCount, + totalCount +}: { + runningCount: number + pausedCount: number + totalCount: number +}) { + const stoppedCount = totalCount - runningCount - pausedCount + + let color: string + let label: string + let pulse = false + + if (runningCount > 0) { + color = 'var(--color-neo-done)' + label = `${runningCount} Running` + pulse = true + } else if (pausedCount > 0) { + color = 'var(--color-neo-pending)' + label = `${pausedCount} Paused` + } else { + color = 'var(--color-neo-text-secondary)' + label = `${stoppedCount} Stopped` + } + + return ( +
+ + + {label} + +
+ ) +} + function AgentStatusBadge({ agent }: { agent: ParallelAgentInfo }) { const color = getAgentColor(agent.agent_id) const statusColors: Record = { diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index 19f1de5a..387d058b 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -189,6 +189,30 @@ export function useStopParallelAgents(projectName: string) { }) } +export function usePauseParallelAgents(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.pauseParallelAgents(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + +export function useResumeParallelAgents(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.resumeParallelAgents(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['parallel-agents-status', projectName] }) + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + export function useMergeParallelWorktrees(projectName: string) { const queryClient = useQueryClient() diff --git a/ui/src/lib/api.ts b/ui/src/lib/api.ts index 9e6685cd..725c4c2b 100644 --- a/ui/src/lib/api.ts +++ b/ui/src/lib/api.ts @@ -178,6 +178,22 @@ export async function stopParallelAgents( }) } +export async function pauseParallelAgents( + projectName: string +): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/pause`, { + method: 'POST', + }) +} + +export async function resumeParallelAgents( + projectName: string +): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/parallel-agents/resume`, { + method: 'POST', + }) +} + export async function mergeParallelWorktrees( projectName: string ): Promise<{ success: boolean; agents: Record; message: string }> {