From bcc717b7dbf50215f8d7cb80da69fb291c36510f Mon Sep 17 00:00:00 2001 From: seocombat Date: Tue, 6 Jan 2026 23:09:42 +0100 Subject: [PATCH 1/9] fix: Windows command line length limit errors On Windows, command line arguments are limited to ~8KB. This caused "The command line is too long" errors when: 1. client.py passed **os.environ to MCP server config (PATH alone can exceed 2KB on Windows) 2. spec_chat_session.py passed 18KB skill content as system_prompt Changes: - client.py: Remove env from MCP config, pass --project-dir as CLI arg - mcp_server/feature_mcp.py: Add argparse support for --project-dir (with fallback to PROJECT_DIR env var for backward compatibility) - spec_chat_session.py: Use short system_prompt, pass skill content in first message instead Tested on Windows 10 with long PATH environment variable. --- client.py | 16 ++++++++-------- mcp_server/feature_mcp.py | 28 ++++++++++++++++++++++++++-- server/services/spec_chat_session.py | 13 +++++++++---- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/client.py b/client.py index 55747913..09c45175 100644 --- a/client.py +++ b/client.py @@ -160,17 +160,17 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): print(" - Warning: System Claude CLI not found, using bundled CLI") # Build MCP servers config - features is always included, playwright only in standard mode + # NOTE: Don't pass env at all - the subprocess inherits the parent environment automatically. + # Passing env (especially **os.environ) exceeds Windows command line limits (~8KB). + # Instead, we pass PROJECT_DIR as a command line argument. mcp_servers = { "features": { "command": sys.executable, # Use the same Python that's running this script - "args": ["-m", "mcp_server.feature_mcp"], - "env": { - # Inherit parent environment (PATH, ANTHROPIC_API_KEY, etc.) - **os.environ, - # Add custom variables - "PROJECT_DIR": str(project_dir.resolve()), - "PYTHONPATH": str(Path(__file__).parent.resolve()), - }, + "args": [ + "-m", "mcp_server.feature_mcp", + "--project-dir", str(project_dir.resolve()), + ], + # No env - subprocess inherits parent environment }, } if not yolo_mode: diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 8c5f3c83..25a90749 100644 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -17,6 +17,7 @@ - feature_create_bulk: Create multiple features at once """ +import argparse import json import os import sys @@ -34,8 +35,31 @@ from api.database import Feature, create_database from api.migration import migrate_json_to_sqlite -# Configuration from environment -PROJECT_DIR = Path(os.environ.get("PROJECT_DIR", ".")).resolve() + +def get_project_dir() -> Path: + """Get project directory from CLI args or environment variable. + + Supports both --project-dir CLI argument (preferred on Windows to avoid + command line length limits) and PROJECT_DIR environment variable. + """ + parser = argparse.ArgumentParser(description="Feature MCP Server") + parser.add_argument( + "--project-dir", + type=str, + default=None, + help="Project directory path" + ) + args, _ = parser.parse_known_args() + + if args.project_dir: + return Path(args.project_dir).resolve() + + # Fall back to environment variable + return Path(os.environ.get("PROJECT_DIR", ".")).resolve() + + +# Configuration - supports both CLI args and environment variable +PROJECT_DIR = get_project_dir() # Pydantic models for input validation diff --git a/server/services/spec_chat_session.py b/server/services/spec_chat_session.py index 1d59532c..8c70306c 100644 --- a/server/services/spec_chat_session.py +++ b/server/services/spec_chat_session.py @@ -72,6 +72,7 @@ def __init__(self, project_name: str, project_dir: Path): self.created_at = datetime.now() self._conversation_id: Optional[str] = None self._client_entered: bool = False # Track if context manager is active + self._skill_content: str = "" # Loaded in start() async def close(self) -> None: """Clean up resources and close the Claude client.""" @@ -138,18 +139,20 @@ async def start(self) -> AsyncGenerator[dict, None]: # Replace $ARGUMENTS with absolute project path (like CLI does in start.py:184) # Using absolute path avoids confusion when project folder name differs from app name project_path = str(self.project_dir.resolve()) - system_prompt = skill_content.replace("$ARGUMENTS", project_path) + self._skill_content = skill_content.replace("$ARGUMENTS", project_path) # Create Claude SDK client with limited tools for spec creation # Use Opus for best quality spec generation # Use system CLI to avoid bundled Bun runtime crash (exit code 3) on Windows + # NOTE: We use a short system_prompt to avoid Windows command line length limits (~8KB) + # The full skill content (~18KB) is passed in the first message instead system_cli = shutil.which("claude") try: self.client = ClaudeSDKClient( options=ClaudeAgentOptions( model="claude-opus-4-5-20251101", cli_path=system_cli, - system_prompt=system_prompt, + system_prompt="You are an expert at creating detailed application specifications. Follow the instructions provided in the user's first message.", allowed_tools=[ "Read", "Write", @@ -173,9 +176,11 @@ async def start(self) -> AsyncGenerator[dict, None]: } return - # Start the conversation - Claude will send the Phase 1 greeting + # Start the conversation - send the full skill content as the first message + # This avoids Windows command line length limits by not using system_prompt try: - async for chunk in self._query_claude("Begin the spec creation process."): + initial_message = f"{self._skill_content}\n\n---\n\nBegin the spec creation process." + async for chunk in self._query_claude(initial_message): yield chunk # Signal that the response is complete (for UI to hide loading indicator) yield {"type": "response_done"} From 03d985eecd998da5e46e027e3bebf3bed2260bb8 Mon Sep 17 00:00:00 2001 From: seocombat Date: Tue, 6 Jan 2026 23:15:29 +0100 Subject: [PATCH 2/9] fix: Also fix assistant_chat_session.py Windows command line length --- server/services/assistant_chat_session.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/services/assistant_chat_session.py b/server/services/assistant_chat_session.py index 8c648fbd..b91dbb9f 100644 --- a/server/services/assistant_chat_session.py +++ b/server/services/assistant_chat_session.py @@ -168,15 +168,15 @@ async def start(self) -> AsyncGenerator[dict, None]: json.dump(security_settings, f, indent=2) # Build MCP servers config - only features MCP for read-only access + # NOTE: Don't pass env - subprocess inherits parent environment. + # Passing **os.environ exceeds Windows command line limits (~8KB). mcp_servers = { "features": { "command": sys.executable, - "args": ["-m", "mcp_server.feature_mcp"], - "env": { - **os.environ, - "PROJECT_DIR": str(self.project_dir.resolve()), - "PYTHONPATH": str(ROOT_DIR.resolve()), - }, + "args": [ + "-m", "mcp_server.feature_mcp", + "--project-dir", str(self.project_dir.resolve()), + ], }, } From 7144ead88b262cef3b5bf7d2e82a0174923d9a92 Mon Sep 17 00:00:00 2001 From: seocombat Date: Tue, 6 Jan 2026 23:20:54 +0100 Subject: [PATCH 3/9] fix: Simplify MCP env handling - don't specify env to inherit parent --- client.py | 7 +++---- server/services/assistant_chat_session.py | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/client.py b/client.py index 09c45175..8a15760a 100644 --- a/client.py +++ b/client.py @@ -160,9 +160,9 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): print(" - Warning: System Claude CLI not found, using bundled CLI") # Build MCP servers config - features is always included, playwright only in standard mode - # NOTE: Don't pass env at all - the subprocess inherits the parent environment automatically. - # Passing env (especially **os.environ) exceeds Windows command line limits (~8KB). - # Instead, we pass PROJECT_DIR as a command line argument. + # NOTE: Don't specify env - let subprocess inherit parent environment. + # This avoids Windows command line limits while keeping necessary env vars. + # PROJECT_DIR is passed as CLI argument instead of env var. mcp_servers = { "features": { "command": sys.executable, # Use the same Python that's running this script @@ -170,7 +170,6 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): "-m", "mcp_server.feature_mcp", "--project-dir", str(project_dir.resolve()), ], - # No env - subprocess inherits parent environment }, } if not yolo_mode: diff --git a/server/services/assistant_chat_session.py b/server/services/assistant_chat_session.py index b91dbb9f..3afe3052 100644 --- a/server/services/assistant_chat_session.py +++ b/server/services/assistant_chat_session.py @@ -168,8 +168,8 @@ async def start(self) -> AsyncGenerator[dict, None]: json.dump(security_settings, f, indent=2) # Build MCP servers config - only features MCP for read-only access - # NOTE: Don't pass env - subprocess inherits parent environment. - # Passing **os.environ exceeds Windows command line limits (~8KB). + # NOTE: Don't specify env - let subprocess inherit parent environment. + # This avoids Windows command line limits while keeping necessary env vars. mcp_servers = { "features": { "command": sys.executable, From 0bc18147a62e6ff8efed005a812f94f28fcb9623 Mon Sep 17 00:00:00 2001 From: seocombat Date: Tue, 6 Jan 2026 23:45:53 +0100 Subject: [PATCH 4/9] Fix assistant initialization timeout on Windows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use short system_prompt to avoid Windows command line limits - Pass project context in first message instead (like spec_chat_session) - Use absolute path for MCP server script to fix module resolution - Add ROOT_DIR constant to client.py for consistent path handling The long system_prompt (6331 chars) was causing Claude CLI initialization to timeout on Windows. This follows the same pattern as spec_chat_session.py which uses a short system_prompt and passes content in the first message. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- client.py | 8 ++++-- server/services/assistant_chat_session.py | 35 +++++++++++++++++------ 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/client.py b/client.py index 8a15760a..a644d8c4 100644 --- a/client.py +++ b/client.py @@ -12,6 +12,9 @@ from pathlib import Path from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient + +# Root directory of the autocoder project (where this file lives) +ROOT_DIR = Path(__file__).parent.resolve() from claude_agent_sdk.types import HookMatcher from security import bash_security_hook @@ -162,12 +165,13 @@ def create_client(project_dir: Path, model: str, yolo_mode: bool = False): # Build MCP servers config - features is always included, playwright only in standard mode # NOTE: Don't specify env - let subprocess inherit parent environment. # This avoids Windows command line limits while keeping necessary env vars. - # PROJECT_DIR is passed as CLI argument instead of env var. + # IMPORTANT: Use absolute path to feature_mcp.py since cwd may be different from autocoder root + mcp_script = ROOT_DIR / "mcp_server" / "feature_mcp.py" mcp_servers = { "features": { "command": sys.executable, # Use the same Python that's running this script "args": [ - "-m", "mcp_server.feature_mcp", + str(mcp_script), "--project-dir", str(project_dir.resolve()), ], }, diff --git a/server/services/assistant_chat_session.py b/server/services/assistant_chat_session.py index 3afe3052..10e86905 100644 --- a/server/services/assistant_chat_session.py +++ b/server/services/assistant_chat_session.py @@ -170,21 +170,28 @@ async def start(self) -> AsyncGenerator[dict, None]: # Build MCP servers config - only features MCP for read-only access # NOTE: Don't specify env - let subprocess inherit parent environment. # This avoids Windows command line limits while keeping necessary env vars. + # IMPORTANT: Use absolute path to feature_mcp.py since cwd may be different from autocoder root + mcp_script = ROOT_DIR / "mcp_server" / "feature_mcp.py" mcp_servers = { "features": { "command": sys.executable, "args": [ - "-m", "mcp_server.feature_mcp", + str(mcp_script.resolve()), "--project-dir", str(self.project_dir.resolve()), ], }, } - # Get system prompt with project context - system_prompt = get_system_prompt(self.project_name, self.project_dir) + # NOTE: Use a short system_prompt to avoid Windows command line length limits (~8KB) + # The full project context will be passed in the greeting message instead + system_prompt = "You are a helpful project assistant with read-only access. Follow the context provided in your first message." + + # Get the full context to include in the greeting + project_context = get_system_prompt(self.project_name, self.project_dir) # Use system Claude CLI system_cli = shutil.which("claude") + logger.info(f"Creating assistant client for {self.project_name}") try: self.client = ClaudeSDKClient( @@ -202,19 +209,29 @@ async def start(self) -> AsyncGenerator[dict, None]: ) await self.client.__aenter__() self._client_entered = True + logger.info(f"Assistant client connected for {self.project_name}") except Exception as e: - logger.exception("Failed to create Claude client") + logger.exception(f"Failed to create Claude client: {type(e).__name__}: {e}") yield {"type": "error", "content": f"Failed to initialize assistant: {str(e)}"} return - # Send initial greeting + # Send project context to Claude as the first message to set up the assistant + # This is similar to how spec_chat_session sends skill content in the first message try: - greeting = f"Hello! I'm your project assistant for **{self.project_name}**. I can help you understand the codebase, explain features, and answer questions about the project. What would you like to know?" + # Send context to Claude and get initial response + context_message = f"{project_context}\n\n---\n\nPlease introduce yourself briefly as the project assistant." + greeting_parts = [] + + async for chunk in self._query_claude(context_message): + if chunk.get("type") == "text": + greeting_parts.append(chunk.get("content", "")) + yield chunk - # Store the greeting in the database - add_message(self.project_dir, self.conversation_id, "assistant", greeting) + # Store the complete greeting in the database + greeting = "".join(greeting_parts) + if greeting: + add_message(self.project_dir, self.conversation_id, "assistant", greeting) - yield {"type": "text", "content": greeting} yield {"type": "response_done"} except Exception as e: logger.exception("Failed to send greeting") From 161397443ca4f9b600aaf0fb773fc89561d1c76d Mon Sep 17 00:00:00 2001 From: seocombat Date: Wed, 7 Jan 2026 01:23:18 +0100 Subject: [PATCH 5/9] feat: Add parallel agent execution with git worktrees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a coordinator-worker pattern for running multiple agents in parallel: - Add FeatureStatus enum (pending/in_progress/passing/conflict/failed) - Add atomic feature claiming via SQLite CTE with RETURNING - Add lease-based heartbeat for stale claim recovery (30min timeout) - Add WorktreeManager for git worktree lifecycle management - Add ParallelCoordinator for spawning/monitoring workers - Add parallel_agent.py CLI entry point (--workers 1-10) - Update process_manager to choose single vs parallel agent script - Add UI worker count selector (cycles 1→3→5) - Add ruff.toml config and fix lint errors Usage: CLI: python parallel_agent.py --project-dir my-app --workers 5 UI: Click users icon to select worker count before starting 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- agent.py | 1 - api/database.py | 76 +++- autonomous_agent_demo.py | 1 - client.py | 1 - mcp_server/feature_mcp.py | 345 ++++++++++++++- parallel_agent.py | 174 ++++++++ parallel_coordinator.py | 502 ++++++++++++++++++++++ ruff.toml | 22 + server/routers/agent.py | 11 +- server/routers/features.py | 20 +- server/routers/projects.py | 6 +- server/routers/spec_creation.py | 3 +- server/schemas.py | 7 + server/services/assistant_chat_session.py | 2 - server/services/process_manager.py | 53 ++- server/services/spec_chat_session.py | 1 - start.py | 4 +- start_ui.py | 4 +- ui/src/App.tsx | 1 + ui/src/components/AgentControl.tsx | 42 +- ui/src/components/NewProjectModal.tsx | 2 +- ui/src/hooks/useProjects.ts | 5 +- ui/src/lib/api.ts | 8 +- ui/src/lib/types.ts | 6 + ui/src/styles/globals.css | 10 + worktree_manager.py | 325 ++++++++++++++ 26 files changed, 1571 insertions(+), 61 deletions(-) create mode 100644 parallel_agent.py create mode 100644 parallel_coordinator.py create mode 100644 ruff.toml create mode 100644 worktree_manager.py diff --git a/agent.py b/agent.py index 7b6ef874..1a88307d 100644 --- a/agent.py +++ b/agent.py @@ -26,7 +26,6 @@ get_coding_prompt, get_coding_prompt_yolo, copy_spec_to_project, - has_project_prompts, ) diff --git a/api/database.py b/api/database.py index f05f92fc..cd4a8e47 100644 --- a/api/database.py +++ b/api/database.py @@ -5,10 +5,11 @@ SQLite database schema for feature storage using SQLAlchemy. """ +import enum from pathlib import Path from typing import Optional -from sqlalchemy import Boolean, Column, Integer, String, Text, create_engine +from sqlalchemy import Boolean, Column, DateTime, Enum, Integer, String, Text, create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.types import JSON @@ -16,6 +17,16 @@ Base = declarative_base() +class FeatureStatus(enum.Enum): + """Status of a feature in the queue.""" + + PENDING = "pending" # Available for claiming + IN_PROGRESS = "in_progress" # Claimed by a worker (leased) + PASSING = "passing" # Completed successfully + CONFLICT = "conflict" # Merge conflict; manual resolution needed + FAILED = "failed" # Agent could not complete (permanent) + + class Feature(Base): """Feature model representing a test case/feature to implement.""" @@ -27,9 +38,26 @@ class Feature(Base): name = Column(String(255), nullable=False) description = Column(Text, nullable=False) steps = Column(JSON, nullable=False) # Stored as JSON array + + # Status tracking (new enum-based status for parallel execution) + status = Column( + Enum(FeatureStatus, values_callable=lambda x: [e.value for e in x]), + default=FeatureStatus.PENDING, + index=True, + ) + + # Legacy fields - kept for backward compatibility passes = Column(Boolean, default=False, index=True) in_progress = Column(Boolean, default=False, index=True) + # Claim/lease tracking for parallel execution + claimed_by = Column(String(100), nullable=True) # Worker ID holding this feature + claimed_at = Column(DateTime, nullable=True) # Timestamp for lease expiry detection + + # Completion audit + completed_at = Column(DateTime, nullable=True) + completed_by = Column(String(100), nullable=True) + def to_dict(self) -> dict: """Convert feature to dictionary for JSON serialization.""" return { @@ -41,6 +69,11 @@ def to_dict(self) -> dict: "steps": self.steps, "passes": self.passes, "in_progress": self.in_progress, + "status": self.status.value if self.status else FeatureStatus.PENDING.value, + "claimed_by": self.claimed_by, + "claimed_at": self.claimed_at.isoformat() if self.claimed_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "completed_by": self.completed_by, } @@ -58,19 +91,46 @@ def get_database_url(project_dir: Path) -> str: return f"sqlite:///{db_path.as_posix()}" -def _migrate_add_in_progress_column(engine) -> None: - """Add in_progress column to existing databases that don't have it.""" +def _migrate_database_schema(engine) -> None: + """Migrate existing databases to add new columns for parallel execution.""" from sqlalchemy import text with engine.connect() as conn: - # Check if column exists + # Check existing columns result = conn.execute(text("PRAGMA table_info(features)")) columns = [row[1] for row in result.fetchall()] + # Migration: in_progress column (legacy) if "in_progress" not in columns: - # Add the column with default value conn.execute(text("ALTER TABLE features ADD COLUMN in_progress BOOLEAN DEFAULT 0")) - conn.commit() + + # Migration: status column for parallel execution + if "status" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN status TEXT DEFAULT 'pending'")) + # Migrate existing data: passes=True -> 'passing', in_progress=True -> 'in_progress' + conn.execute(text(""" + UPDATE features SET status = CASE + WHEN passes = 1 THEN 'passing' + WHEN in_progress = 1 THEN 'in_progress' + ELSE 'pending' + END + """)) + + # Migration: claim tracking columns + if "claimed_by" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN claimed_by TEXT")) + + if "claimed_at" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN claimed_at DATETIME")) + + # Migration: completion audit columns + if "completed_at" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN completed_at DATETIME")) + + if "completed_by" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN completed_by TEXT")) + + conn.commit() def create_database(project_dir: Path) -> tuple: @@ -87,8 +147,8 @@ def create_database(project_dir: Path) -> tuple: engine = create_engine(db_url, connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) - # Migrate existing databases to add in_progress column - _migrate_add_in_progress_column(engine) + # Migrate existing databases to add new columns + _migrate_database_schema(engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) return engine, SessionLocal diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index caa24916..ee074778 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -23,7 +23,6 @@ import argparse import asyncio -import os from pathlib import Path from dotenv import load_dotenv diff --git a/client.py b/client.py index a644d8c4..8330b3af 100644 --- a/client.py +++ b/client.py @@ -6,7 +6,6 @@ """ import json -import os import shutil import sys from pathlib import Path diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 25a90749..87663687 100644 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -22,17 +22,19 @@ import os import sys from contextlib import asynccontextmanager +from datetime import datetime from pathlib import Path from typing import Annotated from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field +from sqlalchemy import text from sqlalchemy.sql.expression import func # Add parent directory to path so we can import from api module sys.path.insert(0, str(Path(__file__).parent.parent)) -from api.database import Feature, create_database +from api.database import Feature, FeatureStatus, create_database from api.migration import migrate_json_to_sqlite @@ -142,22 +144,33 @@ def get_session(): def feature_get_stats() -> str: """Get statistics about feature completion progress. - Returns the number of passing features, in-progress features, total features, + Returns the number of features by status, total features, and completion percentage. Use this to track overall progress of the implementation. Returns: - JSON with: passing (int), in_progress (int), total (int), percentage (float) + JSON with: passing, in_progress, pending, conflict, failed, total, percentage """ session = get_session() try: total = session.query(Feature).count() + + # Legacy boolean counts (for backward compatibility) passing = session.query(Feature).filter(Feature.passes == True).count() in_progress = session.query(Feature).filter(Feature.in_progress == True).count() + + # New status-based counts (for parallel execution) + pending = session.query(Feature).filter(Feature.status == FeatureStatus.PENDING).count() + conflict = session.query(Feature).filter(Feature.status == FeatureStatus.CONFLICT).count() + failed = session.query(Feature).filter(Feature.status == FeatureStatus.FAILED).count() + percentage = round((passing / total) * 100, 1) if total > 0 else 0.0 return json.dumps({ "passing": passing, "in_progress": in_progress, + "pending": pending, + "conflict": conflict, + "failed": failed, "total": total, "percentage": percentage }, indent=2) @@ -229,15 +242,18 @@ def feature_get_for_regression( @mcp.tool() def feature_mark_passing( - feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)] + feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)], + worker_id: Annotated[str, Field(default="", description="Worker ID that completed the feature (optional)")] = "" ) -> str: """Mark a feature as passing after successful implementation. - Updates the feature's passes field to true and clears the in_progress flag. + Updates the feature's passes field to true, sets status to PASSING, + and clears the in_progress flag and lease fields. Use this after you have implemented the feature and verified it works correctly. Args: feature_id: The ID of the feature to mark as passing + worker_id: Optional worker ID that completed this feature Returns: JSON with the updated feature details, or error if not found. @@ -249,8 +265,19 @@ def feature_mark_passing( if feature is None: return json.dumps({"error": f"Feature with ID {feature_id} not found"}) + # Update status and passes flag + feature.status = FeatureStatus.PASSING feature.passes = True feature.in_progress = False + + # Clear lease fields + feature.claimed_by = None + feature.claimed_at = None + + # Set completion audit + feature.completed_at = datetime.utcnow() + feature.completed_by = worker_id if worker_id else None + session.commit() session.refresh(feature) @@ -437,5 +464,313 @@ def feature_create_bulk( session.close() +# ============================================================================= +# Parallel Execution Tools +# ============================================================================= + + +@mcp.tool() +def feature_claim_next( + worker_id: Annotated[str, Field(description="Worker ID claiming the feature", min_length=1)] +) -> str: + """Atomically claim the next available feature for a worker. + + Uses SQLite's CTE with conditional UPDATE to ensure only one worker + can claim each feature (race-condition safe). This should be used + instead of feature_get_next + feature_mark_in_progress in parallel mode. + + Args: + worker_id: Unique identifier for the worker claiming the feature + + Returns: + JSON with claimed feature details, or {"status": "no_features_available"} + if no pending features exist. + """ + session = get_session() + try: + now = datetime.utcnow() + + # Atomic claim using CTE + conditional UPDATE with RETURNING + # This ensures only one worker can claim each feature + result = session.execute(text(""" + WITH next AS ( + SELECT id FROM features + WHERE status = 'pending' + ORDER BY priority ASC, id ASC + LIMIT 1 + ) + UPDATE features + SET status = 'in_progress', + in_progress = 1, + claimed_by = :worker_id, + claimed_at = :now + WHERE id IN (SELECT id FROM next) + AND status = 'pending' + RETURNING * + """), {"worker_id": worker_id, "now": now}) + + row = result.fetchone() + if row is None: + session.rollback() + return json.dumps({"status": "no_features_available"}) + + session.commit() + + # Convert row to dict + columns = result.keys() + feature_dict = dict(zip(columns, row)) + + return json.dumps(feature_dict, indent=2, default=str) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_heartbeat( + feature_id: Annotated[int, Field(description="The ID of the feature to heartbeat", ge=1)], + worker_id: Annotated[str, Field(description="Worker ID that owns the feature", min_length=1)] +) -> str: + """Extend lease on a claimed feature. Call every ~5 minutes while working. + + Updates the claimed_at timestamp to prevent stale claim recovery + from reclaiming this feature. + + Args: + feature_id: The ID of the feature + worker_id: Worker ID that should own this feature + + Returns: + JSON with {"status": "renewed"} on success, + or {"status": "lease_lost"} if this worker no longer owns the feature. + """ + session = get_session() + try: + now = datetime.utcnow() + + result = session.execute(text(""" + UPDATE features + SET claimed_at = :now + WHERE id = :feature_id + AND claimed_by = :worker_id + AND status = 'in_progress' + """), {"feature_id": feature_id, "worker_id": worker_id, "now": now}) + + if result.rowcount == 0: + session.rollback() + return json.dumps({"status": "lease_lost"}) + + session.commit() + return json.dumps({"status": "renewed"}) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_release_claim( + feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], + worker_id: Annotated[str, Field(description="Worker ID releasing the feature", min_length=1)] +) -> str: + """Release claim on a feature, returning it to pending status. + + Use this when abandoning a feature (e.g., agent failure, worker restart). + Only releases if the specified worker currently owns the feature. + + Args: + feature_id: The ID of the feature to release + worker_id: Worker ID that should own this feature + + Returns: + JSON with {"status": "released"} on success, + or {"status": "not_owner"} if this worker doesn't own the feature. + """ + session = get_session() + try: + result = session.execute(text(""" + UPDATE features + SET status = 'pending', + in_progress = 0, + claimed_by = NULL, + claimed_at = NULL + WHERE id = :feature_id + AND claimed_by = :worker_id + AND status = 'in_progress' + """), {"feature_id": feature_id, "worker_id": worker_id}) + + if result.rowcount == 0: + session.rollback() + return json.dumps({"status": "not_owner"}) + + session.commit() + return json.dumps({"status": "released"}) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_mark_conflict( + feature_id: Annotated[int, Field(description="The ID of the feature with merge conflict", ge=1)], + worker_id: Annotated[str, Field(description="Worker ID that encountered the conflict", min_length=1)] +) -> str: + """Mark a feature as having a merge conflict. + + Use this when the feature's changes could not be merged into main. + The feature will require manual resolution. + + Args: + feature_id: The ID of the feature with conflict + worker_id: Worker ID that encountered the conflict + + Returns: + JSON with {"status": "marked_conflict"} on success, + or {"status": "not_owner"} if this worker doesn't own the feature. + """ + session = get_session() + try: + result = session.execute(text(""" + UPDATE features + SET status = 'conflict', + passes = 0, + in_progress = 0, + claimed_by = NULL, + claimed_at = NULL + WHERE id = :feature_id + AND claimed_by = :worker_id + """), {"feature_id": feature_id, "worker_id": worker_id}) + + if result.rowcount == 0: + session.rollback() + return json.dumps({"status": "not_owner"}) + + session.commit() + return json.dumps({"status": "marked_conflict"}) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_mark_failed( + feature_id: Annotated[int, Field(description="The ID of the feature that failed", ge=1)], + worker_id: Annotated[str, Field(description="Worker ID that encountered the failure", min_length=1)] +) -> str: + """Mark a feature as permanently failed. + + Use this when the agent cannot complete the feature and it should + not be retried automatically. + + Args: + feature_id: The ID of the failed feature + worker_id: Worker ID that encountered the failure + + Returns: + JSON with {"status": "marked_failed"} on success, + or {"status": "not_owner"} if this worker doesn't own the feature. + """ + session = get_session() + try: + result = session.execute(text(""" + UPDATE features + SET status = 'failed', + passes = 0, + in_progress = 0, + claimed_by = NULL, + claimed_at = NULL + WHERE id = :feature_id + AND claimed_by = :worker_id + """), {"feature_id": feature_id, "worker_id": worker_id}) + + if result.rowcount == 0: + session.rollback() + return json.dumps({"status": "not_owner"}) + + session.commit() + return json.dumps({"status": "marked_failed"}) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_reclaim_stale( + lease_timeout_minutes: Annotated[int, Field(default=30, ge=5, le=120, description="Minutes after which a claim is considered stale")] = 30 +) -> str: + """Reclaim features with expired leases. + + Used by the coordinator to recover from worker crashes. + Features that have been in_progress for longer than lease_timeout + without a heartbeat are returned to pending status. + + Args: + lease_timeout_minutes: Minutes after which a claim is considered stale (5-120, default 30) + + Returns: + JSON with {"reclaimed": count} indicating how many features were reclaimed. + """ + session = get_session() + try: + # Calculate cutoff time + cutoff = datetime.utcnow() + # Subtract minutes manually since SQLite doesn't have great datetime math + from datetime import timedelta + cutoff = cutoff - timedelta(minutes=lease_timeout_minutes) + + result = session.execute(text(""" + UPDATE features + SET status = 'pending', + in_progress = 0, + claimed_by = NULL, + claimed_at = NULL + WHERE status = 'in_progress' + AND claimed_at < :cutoff + """), {"cutoff": cutoff}) + + reclaimed = result.rowcount + session.commit() + + return json.dumps({"reclaimed": reclaimed}) + except Exception as e: + session.rollback() + return json.dumps({"error": str(e)}) + finally: + session.close() + + +@mcp.tool() +def feature_is_project_complete() -> str: + """Check if the project has no more automated work remaining. + + Returns true when there are no PENDING and no IN_PROGRESS features. + Note: CONFLICT and FAILED features don't block completion. + + Returns: + JSON with {"complete": bool, "pending": int, "in_progress": int} + """ + session = get_session() + try: + pending = session.query(Feature).filter(Feature.status == FeatureStatus.PENDING).count() + in_progress = session.query(Feature).filter(Feature.status == FeatureStatus.IN_PROGRESS).count() + + return json.dumps({ + "complete": pending == 0 and in_progress == 0, + "pending": pending, + "in_progress": in_progress + }) + finally: + session.close() + + if __name__ == "__main__": mcp.run() diff --git a/parallel_agent.py b/parallel_agent.py new file mode 100644 index 00000000..1b0f1ea3 --- /dev/null +++ b/parallel_agent.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Parallel Agent CLI +================== + +Entry point for running multiple agents in parallel using git worktrees. +Each worker operates in its own isolated worktree with atomic feature claiming. + +Usage: + python parallel_agent.py --project-dir /path/to/project --workers 3 + python parallel_agent.py --project-dir my-project --workers 5 --yolo +""" + +import argparse +import asyncio +import logging +import signal +import sys +from pathlib import Path + +from parallel_coordinator import ParallelCoordinator +from registry import get_project_path + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging for the parallel agent.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + + +def resolve_project_path(project_dir: str) -> Path: + """ + Resolve project directory from name or path. + + Supports: + - Absolute paths: C:/Projects/myapp + - Relative paths: ./myapp + - Registered project names: myapp + """ + path = Path(project_dir) + + # If it looks like a path, use it directly + if path.is_absolute() or project_dir.startswith((".", "/", "\\")): + return path.resolve() + + # Try to look up in registry + registered_path = get_project_path(project_dir) + if registered_path: + return Path(registered_path).resolve() + + # Fall back to treating as relative path + return path.resolve() + + +def validate_project(project_dir: Path) -> None: + """Validate that the project is ready for parallel execution.""" + if not project_dir.exists(): + raise ValueError(f"Project directory does not exist: {project_dir}") + + if not (project_dir / ".git").exists(): + raise ValueError(f"Project must be a git repository: {project_dir}") + + if not (project_dir / "features.db").exists(): + raise ValueError( + f"No features.db found. Run the initializer agent first:\n" + f" python autonomous_agent_demo.py --project-dir {project_dir}" + ) + + +def main() -> int: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Run multiple agents in parallel using git worktrees", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python parallel_agent.py --project-dir /path/to/project + python parallel_agent.py --project-dir my-project --workers 5 + python parallel_agent.py --project-dir my-project --workers 3 --yolo + +The project must: + 1. Be a git repository + 2. Have features.db (run initializer first if needed) + 3. Have a clean working tree (uncommitted changes may cause issues) + """, + ) + + parser.add_argument( + "--project-dir", + required=True, + help="Project directory path or registered name", + ) + parser.add_argument( + "--workers", + type=int, + default=3, + choices=range(1, 11), + metavar="N", + help="Number of parallel workers (1-10, default: 3)", + ) + parser.add_argument( + "--model", + default="claude-sonnet-4-20250514", + help="Claude model to use (default: claude-sonnet-4-20250514)", + ) + parser.add_argument( + "--yolo", + action="store_true", + help="YOLO mode: skip browser testing for faster iteration", + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging", + ) + + args = parser.parse_args() + + # Setup logging + setup_logging(args.verbose) + logger = logging.getLogger(__name__) + + # Resolve and validate project + try: + project_dir = resolve_project_path(args.project_dir) + validate_project(project_dir) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + # Print banner + print("\n" + "=" * 70) + print(" PARALLEL AGENT EXECUTION") + print("=" * 70) + print(f"\nProject: {project_dir}") + print(f"Workers: {args.workers}") + print(f"Model: {args.model}") + print(f"Mode: {'YOLO (testing disabled)' if args.yolo else 'Standard'}") + print() + + # Create coordinator + coordinator = ParallelCoordinator( + project_dir=project_dir, + worker_count=args.workers, + model=args.model, + yolo_mode=args.yolo, + ) + + # Setup signal handlers for graceful shutdown + def signal_handler(signum, frame): + print("\nReceived interrupt, shutting down gracefully...") + coordinator.request_shutdown() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Run coordinator + try: + asyncio.run(coordinator.run()) + return 0 + except KeyboardInterrupt: + print("\nInterrupted") + return 130 + except Exception as e: + logger.exception(f"Fatal error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/parallel_coordinator.py b/parallel_coordinator.py new file mode 100644 index 00000000..2717202e --- /dev/null +++ b/parallel_coordinator.py @@ -0,0 +1,502 @@ +""" +Parallel Agent Coordinator +========================== + +Manages multiple agent workers running in parallel using git worktrees. +Each worker operates in its own isolated worktree with atomic feature claiming. +""" + +import asyncio +import json +import logging +import sys +from pathlib import Path +from typing import Optional + +from worktree_manager import WorktreeManager + +logger = logging.getLogger(__name__) + +# Configuration +IDLE_BACKOFF_INITIAL = 5 # seconds +IDLE_BACKOFF_MAX = 60 # seconds +MONITOR_INTERVAL = 60 # seconds +LEASE_TIMEOUT_MINUTES = 30 +HEARTBEAT_INTERVAL = 300 # 5 minutes + + +class WorkerProcess: + """Represents a single worker agent process.""" + + def __init__( + self, + worker_id: int, + worktree_path: Path, + project_dir: Path, + model: str, + yolo_mode: bool = False, + ): + self.worker_id = worker_id + self.worker_name = f"worker-{worker_id}" + self.worktree_path = worktree_path + self.project_dir = project_dir + self.model = model + self.yolo_mode = yolo_mode + self.current_feature: Optional[dict] = None + self.process: Optional[asyncio.subprocess.Process] = None + self._heartbeat_task: Optional[asyncio.Task] = None + + async def claim_feature(self) -> Optional[dict]: + """ + Claim the next available feature using the MCP tool. + + Returns: + Feature dict if claimed, None if no features available. + """ + result = await self._run_mcp_tool("feature_claim_next", {"worker_id": self.worker_name}) + + if result.get("status") == "no_features_available": + return None + + if "error" in result: + logger.error(f"Worker {self.worker_name} claim error: {result['error']}") + return None + + self.current_feature = result + return result + + async def heartbeat(self) -> bool: + """ + Send heartbeat to extend lease on current feature. + + Returns: + True if lease renewed, False if lease lost. + """ + if not self.current_feature: + return False + + result = await self._run_mcp_tool( + "feature_heartbeat", + {"feature_id": self.current_feature["id"], "worker_id": self.worker_name} + ) + + return result.get("status") == "renewed" + + async def release_claim(self) -> None: + """Release claim on current feature.""" + if not self.current_feature: + return + + await self._run_mcp_tool( + "feature_release_claim", + {"feature_id": self.current_feature["id"], "worker_id": self.worker_name} + ) + self.current_feature = None + + async def mark_passing(self) -> None: + """Mark current feature as passing.""" + if not self.current_feature: + return + + await self._run_mcp_tool( + "feature_mark_passing", + {"feature_id": self.current_feature["id"], "worker_id": self.worker_name} + ) + self.current_feature = None + + async def mark_conflict(self) -> None: + """Mark current feature as having merge conflict.""" + if not self.current_feature: + return + + await self._run_mcp_tool( + "feature_mark_conflict", + {"feature_id": self.current_feature["id"], "worker_id": self.worker_name} + ) + self.current_feature = None + + async def mark_failed(self) -> None: + """Mark current feature as permanently failed.""" + if not self.current_feature: + return + + await self._run_mcp_tool( + "feature_mark_failed", + {"feature_id": self.current_feature["id"], "worker_id": self.worker_name} + ) + self.current_feature = None + + async def _run_mcp_tool(self, tool_name: str, params: dict) -> dict: + """ + Run an MCP tool by calling feature_mcp.py directly. + + This imports and calls the tool function directly rather than + using subprocess or MCP protocol for simplicity. + """ + sys.path.insert(0, str(self.project_dir.parent)) + + try: + from mcp_server.feature_mcp import ( + feature_claim_next, + feature_heartbeat, + feature_release_claim, + feature_mark_passing, + feature_mark_conflict, + feature_mark_failed, + ) + + tool_map = { + "feature_claim_next": feature_claim_next, + "feature_heartbeat": feature_heartbeat, + "feature_release_claim": feature_release_claim, + "feature_mark_passing": feature_mark_passing, + "feature_mark_conflict": feature_mark_conflict, + "feature_mark_failed": feature_mark_failed, + } + + tool_func = tool_map.get(tool_name) + if not tool_func: + return {"error": f"Unknown tool: {tool_name}"} + + result_json = tool_func(**params) + return json.loads(result_json) + + except Exception as e: + logger.error(f"MCP tool error: {e}") + return {"error": str(e)} + + async def run_agent_session(self, feature: dict) -> bool: + """ + Run a single agent session to implement a feature. + + Args: + feature: Feature dict from claim + + Returns: + True if feature was successfully implemented. + """ + logger.info(f"Worker {self.worker_name} starting feature: {feature.get('name', feature.get('id'))}") + + # Start heartbeat task + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + + try: + # Run the agent subprocess in the worktree + # Uses create_subprocess_exec (safe, no shell injection) + cmd = [ + sys.executable, + str(Path(__file__).parent / "autonomous_agent_demo.py"), + "--project-dir", str(self.worktree_path), + "--max-iterations", "1", + ] + + if self.yolo_mode: + cmd.append("--yolo") + + logger.debug(f"Worker {self.worker_name} running: {' '.join(cmd)}") + + self.process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + cwd=str(self.worktree_path), + ) + + # Stream output with worker prefix + while True: + line = await self.process.stdout.readline() + if not line: + break + text = line.decode("utf-8", errors="replace").rstrip() + print(f"[{self.worker_name}] {text}") + + await self.process.wait() + + success = self.process.returncode == 0 + logger.info(f"Worker {self.worker_name} finished feature with code: {self.process.returncode}") + + return success + + except asyncio.CancelledError: + if self.process: + self.process.terminate() + raise + + finally: + # Cancel heartbeat + if self._heartbeat_task: + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + self.process = None + + async def _heartbeat_loop(self) -> None: + """Background task to send periodic heartbeats.""" + try: + while True: + await asyncio.sleep(HEARTBEAT_INTERVAL) + if not await self.heartbeat(): + logger.warning(f"Worker {self.worker_name} lost lease!") + break + except asyncio.CancelledError: + pass + + +class ParallelCoordinator: + """ + Coordinates multiple parallel agent workers. + + Manages worktree lifecycle, spawns workers, handles merges, + and monitors for stale claims. + """ + + def __init__( + self, + project_dir: Path, + worker_count: int = 3, + model: str = "claude-sonnet-4-20250514", + yolo_mode: bool = False, + ): + self.project_dir = project_dir + self.worker_count = worker_count + self.model = model + self.yolo_mode = yolo_mode + + self.worktree_mgr = WorktreeManager(project_dir, worker_count) + self.workers: dict[int, WorkerProcess] = {} + self.worktree_paths: dict[int, Path] = {} + + self.shutdown_event = asyncio.Event() + self.merge_lock = asyncio.Lock() + + async def run(self) -> None: + """Main coordinator loop.""" + logger.info(f"Starting parallel coordinator with {self.worker_count} workers") + logger.info(f"Project: {self.project_dir}") + logger.info(f"Mode: {'YOLO' if self.yolo_mode else 'Standard'}") + + # Verify git repo + if not await self.worktree_mgr.is_git_repo(): + raise RuntimeError(f"Not a git repository: {self.project_dir}") + + try: + # Setup worktrees + logger.info("Setting up worktrees...") + self.worktree_paths = await self.worktree_mgr.setup_all_worktrees() + + # Create worker processes + for worker_id, worktree_path in self.worktree_paths.items(): + self.workers[worker_id] = WorkerProcess( + worker_id=worker_id, + worktree_path=worktree_path, + project_dir=self.project_dir, + model=self.model, + yolo_mode=self.yolo_mode, + ) + logger.info(f"Created worker-{worker_id} at {worktree_path}") + + # Start worker tasks + worker_tasks = { + worker_id: asyncio.create_task( + self._run_worker_loop(worker_id), + name=f"worker-{worker_id}" + ) + for worker_id in self.workers + } + + # Start monitor task + monitor_task = asyncio.create_task( + self._monitor_loop(), + name="monitor" + ) + + # Wait for all workers to complete + logger.info("All workers started, waiting for completion...") + await asyncio.gather(*worker_tasks.values()) + + # Workers done, cancel monitor + logger.info("All workers completed") + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + + finally: + # Always cleanup + logger.info("Cleaning up...") + await self.worktree_mgr.cleanup_all() + + # Final summary + await self._print_final_summary() + + async def _run_worker_loop(self, worker_id: int) -> None: + """ + Main loop for a single worker. + + Claims features, processes them, and merges results. + """ + worker = self.workers[worker_id] + worktree_path = self.worktree_paths[worker_id] + idle_backoff = IDLE_BACKOFF_INITIAL + + while not self.shutdown_event.is_set(): + try: + result = await self._process_one_feature(worker, worktree_path) + + if result == "success": + idle_backoff = IDLE_BACKOFF_INITIAL # Reset backoff + + elif result == "idle": + # Check if project is complete + if await self._is_project_complete(): + logger.info(f"Worker {worker.worker_name}: Project complete, exiting") + break + + logger.debug(f"Worker {worker.worker_name}: Idle, backing off {idle_backoff}s") + await asyncio.sleep(idle_backoff) + idle_backoff = min(idle_backoff * 2, IDLE_BACKOFF_MAX) + + elif result == "failed": + # Feature failed but continue to next + idle_backoff = IDLE_BACKOFF_INITIAL + + except asyncio.CancelledError: + # Cleanup on cancellation + if worker.current_feature: + await worker.release_claim() + raise + + except Exception as e: + logger.exception(f"Worker {worker.worker_name} error: {e}") + if worker.current_feature: + await worker.release_claim() + await asyncio.sleep(idle_backoff) + + async def _process_one_feature( + self, + worker: WorkerProcess, + worktree_path: Path + ) -> str: + """ + Process a single feature: claim -> implement -> merge. + + Returns: + "success" - feature completed and merged + "idle" - no features available + "failed" - feature failed (released or marked failed) + "conflict" - merge conflict (marked conflict) + """ + # 1. Claim a feature + feature = await worker.claim_feature() + if feature is None: + return "idle" + + feature_id = feature.get("id") + feature_name = feature.get("name", f"Feature {feature_id}") + logger.info(f"Worker {worker.worker_name} claimed: {feature_name}") + + # 2. Create feature branch in worktree + feature_branch = await self.worktree_mgr.checkout_feature_branch( + worker.worker_id, feature_id, worktree_path + ) + + # 3. Run agent session + success = await worker.run_agent_session(feature) + + if not success: + logger.warning(f"Worker {worker.worker_name}: Feature {feature_name} failed") + await worker.release_claim() # Return to pending for retry + return "failed" + + # 4. Merge with serialization + async with self.merge_lock: + merge_success, merge_msg = await self.worktree_mgr.merge_feature_branch( + feature_branch, worktree_path + ) + + if merge_success: + await worker.mark_passing() + await self.worktree_mgr.delete_feature_branch(feature_branch) + logger.info(f"Worker {worker.worker_name}: {feature_name} PASSING - {merge_msg}") + return "success" + else: + await worker.mark_conflict() + logger.warning(f"Worker {worker.worker_name}: {feature_name} CONFLICT - {merge_msg}") + return "conflict" + + async def _monitor_loop(self) -> None: + """ + Background monitor for health checks and stale claim recovery. + """ + try: + while not self.shutdown_event.is_set(): + await asyncio.sleep(MONITOR_INTERVAL) + await self._recover_stale_claims() + except asyncio.CancelledError: + pass + + async def _recover_stale_claims(self) -> None: + """Reclaim features with expired leases.""" + try: + sys.path.insert(0, str(self.project_dir.parent)) + from mcp_server.feature_mcp import feature_reclaim_stale + + result_json = feature_reclaim_stale(LEASE_TIMEOUT_MINUTES) + result = json.loads(result_json) + + reclaimed = result.get("reclaimed", 0) + if reclaimed > 0: + logger.info(f"Reclaimed {reclaimed} stale features") + + except Exception as e: + logger.error(f"Error reclaiming stale claims: {e}") + + async def _is_project_complete(self) -> bool: + """Check if no more automated work remains.""" + try: + sys.path.insert(0, str(self.project_dir.parent)) + from mcp_server.feature_mcp import feature_is_project_complete + + result_json = feature_is_project_complete() + result = json.loads(result_json) + + return result.get("complete", False) + + except Exception as e: + logger.error(f"Error checking project completion: {e}") + return False + + async def _print_final_summary(self) -> None: + """Print final progress summary.""" + try: + sys.path.insert(0, str(self.project_dir.parent)) + from mcp_server.feature_mcp import feature_get_stats + + result_json = feature_get_stats() + stats = json.loads(result_json) + + print("\n" + "=" * 70) + print(" PARALLEL EXECUTION COMPLETE") + print("=" * 70) + print(f"\nProject: {self.project_dir}") + print(f"Workers: {self.worker_count}") + print("\nResults:") + print(f" Passing: {stats.get('passing', 0)}") + print(f" Pending: {stats.get('pending', 0)}") + print(f" In Progress: {stats.get('in_progress', 0)}") + print(f" Conflicts: {stats.get('conflict', 0)}") + print(f" Failed: {stats.get('failed', 0)}") + print(f" Total: {stats.get('total', 0)}") + print(f" Progress: {stats.get('percentage', 0)}%") + print("=" * 70) + + except Exception as e: + logger.error(f"Error printing summary: {e}") + + def request_shutdown(self) -> None: + """Request graceful shutdown of all workers.""" + logger.info("Shutdown requested") + self.shutdown_event.set() diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 00000000..34e0d1d2 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,22 @@ +# Ruff configuration for autocoder project + +[lint] +# Ignore specific rules where they're false positives or intentional +ignore = [ + # E402: Module level import not at top of file + # Intentional: load_dotenv() must run before imports, ROOT_DIR must be set first + "E402", + # E712: Avoid equality comparisons to True/False + # SQLAlchemy filters require `== True` / `== False` syntax + "E712", +] + +# Exclude common non-source directories +exclude = [ + ".git", + ".venv", + "venv", + "__pycache__", + "node_modules", + "ui", +] diff --git a/server/routers/agent.py b/server/routers/agent.py index ecce84c8..efdc7f94 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -69,6 +69,7 @@ async def get_agent_status(project_name: str): pid=manager.pid, started_at=manager.started_at, yolo_mode=manager.yolo_mode, + parallel_workers=manager.parallel_workers, ) @@ -77,10 +78,16 @@ async def start_agent( project_name: str, request: AgentStartRequest = AgentStartRequest(), ): - """Start the agent for a project.""" + """Start the agent for a project. + + If parallel_workers > 1, uses git worktrees for parallel execution. + """ manager = get_project_manager(project_name) - success, message = await manager.start(yolo_mode=request.yolo_mode) + success, message = await manager.start( + yolo_mode=request.yolo_mode, + parallel_workers=request.parallel_workers, + ) return AgentActionResponse( success=success, diff --git a/server/routers/features.py b/server/routers/features.py index 0caa7bad..620e58e9 100644 --- a/server/routers/features.py +++ b/server/routers/features.py @@ -110,7 +110,7 @@ async def list_features(project_name: str): raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") db_file = project_dir / "features.db" if not db_file.exists(): @@ -142,7 +142,7 @@ async def list_features(project_name: str): ) except HTTPException: raise - except Exception as e: + except Exception: logger.exception("Database error in list_features") raise HTTPException(status_code=500, detail="Database error occurred") @@ -157,7 +157,7 @@ async def create_feature(project_name: str, feature: FeatureCreate): raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") _, Feature = _get_db_classes() @@ -187,7 +187,7 @@ async def create_feature(project_name: str, feature: FeatureCreate): return feature_to_response(db_feature) except HTTPException: raise - except Exception as e: + except Exception: logger.exception("Failed to create feature") raise HTTPException(status_code=500, detail="Failed to create feature") @@ -202,7 +202,7 @@ async def get_feature(project_name: str, feature_id: int): raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") db_file = project_dir / "features.db" if not db_file.exists(): @@ -220,7 +220,7 @@ async def get_feature(project_name: str, feature_id: int): return feature_to_response(feature) except HTTPException: raise - except Exception as e: + except Exception: logger.exception("Database error in get_feature") raise HTTPException(status_code=500, detail="Database error occurred") @@ -235,7 +235,7 @@ async def delete_feature(project_name: str, feature_id: int): raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") _, Feature = _get_db_classes() @@ -252,7 +252,7 @@ async def delete_feature(project_name: str, feature_id: int): return {"success": True, "message": f"Feature {feature_id} deleted"} except HTTPException: raise - except Exception as e: + except Exception: logger.exception("Failed to delete feature") raise HTTPException(status_code=500, detail="Failed to delete feature") @@ -272,7 +272,7 @@ async def skip_feature(project_name: str, feature_id: int): raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") _, Feature = _get_db_classes() @@ -292,6 +292,6 @@ async def skip_feature(project_name: str, feature_id: int): return {"success": True, "message": f"Feature {feature_id} moved to end of queue"} except HTTPException: raise - except Exception as e: + except Exception: logger.exception("Failed to skip feature") raise HTTPException(status_code=500, detail="Failed to skip feature") diff --git a/server/routers/projects.py b/server/routers/projects.py index 2e190fba..f9571746 100644 --- a/server/routers/projects.py +++ b/server/routers/projects.py @@ -272,7 +272,7 @@ async def get_project_prompts(name: str): raise HTTPException(status_code=404, detail=f"Project '{name}' not found") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") prompts_dir = _get_project_prompts_dir(project_dir) @@ -305,7 +305,7 @@ async def update_project_prompts(name: str, prompts: ProjectPromptsUpdate): raise HTTPException(status_code=404, detail=f"Project '{name}' not found") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") prompts_dir = _get_project_prompts_dir(project_dir) prompts_dir.mkdir(parents=True, exist_ok=True) @@ -335,6 +335,6 @@ async def get_project_stats_endpoint(name: str): raise HTTPException(status_code=404, detail=f"Project '{name}' not found") if not project_dir.exists(): - raise HTTPException(status_code=404, detail=f"Project directory not found") + raise HTTPException(status_code=404, detail="Project directory not found") return get_project_stats(project_dir) diff --git a/server/routers/spec_creation.py b/server/routers/spec_creation.py index 639f1b57..ab10486e 100644 --- a/server/routers/spec_creation.py +++ b/server/routers/spec_creation.py @@ -5,12 +5,11 @@ WebSocket and REST endpoints for interactive spec creation with Claude. """ -import asyncio import json import logging import re from pathlib import Path -from typing import Any, Optional +from typing import Optional from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException from pydantic import BaseModel, ValidationError diff --git a/server/schemas.py b/server/schemas.py index 723d4609..120a9c85 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -103,6 +103,12 @@ class FeatureListResponse(BaseModel): class AgentStartRequest(BaseModel): """Request schema for starting the agent.""" yolo_mode: bool = False + parallel_workers: int | None = Field( + default=None, + ge=1, + le=10, + description="Number of parallel workers (1-10). None or 1 = single agent mode" + ) class AgentStatus(BaseModel): @@ -111,6 +117,7 @@ class AgentStatus(BaseModel): pid: int | None = None started_at: datetime | None = None yolo_mode: bool = False + parallel_workers: int | None = None # None = single agent mode class AgentActionResponse(BaseModel): diff --git a/server/services/assistant_chat_session.py b/server/services/assistant_chat_session.py index 10e86905..bcbf50d7 100644 --- a/server/services/assistant_chat_session.py +++ b/server/services/assistant_chat_session.py @@ -9,7 +9,6 @@ import json import logging -import os import shutil import sys import threading @@ -22,7 +21,6 @@ from .assistant_database import ( create_conversation, add_message, - get_conversation, ) logger = logging.getLogger(__name__) diff --git a/server/services/process_manager.py b/server/services/process_manager.py index 31042cc7..35cfe02e 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -75,6 +75,7 @@ def __init__( self.started_at: datetime | None = None self._output_task: asyncio.Task | None = None self.yolo_mode: bool = False # YOLO mode for rapid prototyping + self.parallel_workers: int | None = None # Number of parallel workers (None = single agent) # Support multiple callbacks (for multiple WebSocket clients) self._output_callbacks: Set[Callable[[str], Awaitable[None]]] = set() @@ -148,11 +149,11 @@ def _check_lock(self) -> bool: try: pid = int(self.lock_file.read_text().strip()) if psutil.pid_exists(pid): - # Check if it's actually our agent process + # Check if it's actually our agent process (single or parallel) try: proc = psutil.Process(pid) cmdline = " ".join(proc.cmdline()) - if "autonomous_agent_demo.py" in cmdline: + if "autonomous_agent_demo.py" in cmdline or "parallel_agent.py" in cmdline: return False # Another agent is running except (psutil.NoSuchProcess, psutil.AccessDenied): pass @@ -215,12 +216,17 @@ async def _stream_output(self) -> None: self.status = "stopped" self._remove_lock() - async def start(self, yolo_mode: bool = False) -> tuple[bool, str]: + async def start( + self, + yolo_mode: bool = False, + parallel_workers: int | None = None + ) -> tuple[bool, str]: """ Start the agent as a subprocess. Args: yolo_mode: If True, run in YOLO mode (no browser testing) + parallel_workers: Number of parallel workers (None or 1 = single agent) Returns: Tuple of (success, message) @@ -231,16 +237,35 @@ async def start(self, yolo_mode: bool = False) -> tuple[bool, str]: if not self._check_lock(): return False, "Another agent instance is already running for this project" - # Store YOLO mode for status queries + # Store mode settings for status queries self.yolo_mode = yolo_mode - - # Build command - pass absolute path to project directory - cmd = [ - sys.executable, - str(self.root_dir / "autonomous_agent_demo.py"), - "--project-dir", - str(self.project_dir.resolve()), - ] + self.parallel_workers = parallel_workers if parallel_workers and parallel_workers > 1 else None + + # Determine which script to run + use_parallel = parallel_workers is not None and parallel_workers > 1 + + if use_parallel: + # Verify git repo for parallel mode + if not (self.project_dir / ".git").exists(): + return False, "Parallel mode requires a git repository" + + # Build command for parallel agent + cmd = [ + sys.executable, + str(self.root_dir / "parallel_agent.py"), + "--project-dir", + str(self.project_dir.resolve()), + "--workers", + str(parallel_workers), + ] + else: + # Build command for single agent + cmd = [ + sys.executable, + str(self.root_dir / "autonomous_agent_demo.py"), + "--project-dir", + str(self.project_dir.resolve()), + ] # Add --yolo flag if YOLO mode is enabled if yolo_mode: @@ -263,6 +288,8 @@ async def start(self, yolo_mode: bool = False) -> tuple[bool, str]: # Start output streaming task self._output_task = asyncio.create_task(self._stream_output()) + if use_parallel: + return True, f"Parallel agent started with {parallel_workers} workers (PID {self.process.pid})" return True, f"Agent started with PID {self.process.pid}" except Exception as e: logger.exception("Failed to start agent") @@ -307,6 +334,7 @@ async def stop(self) -> tuple[bool, str]: self.process = None self.started_at = None self.yolo_mode = False # Reset YOLO mode + self.parallel_workers = None # Reset parallel mode return True, "Agent stopped" except Exception as e: @@ -388,6 +416,7 @@ def get_status_dict(self) -> dict: "pid": self.pid, "started_at": self.started_at.isoformat() if self.started_at else None, "yolo_mode": self.yolo_mode, + "parallel_workers": self.parallel_workers, } diff --git a/server/services/spec_chat_session.py b/server/services/spec_chat_session.py index 8c70306c..24751612 100644 --- a/server/services/spec_chat_session.py +++ b/server/services/spec_chat_session.py @@ -6,7 +6,6 @@ Uses the create-spec.md skill to guide users through app spec creation. """ -import asyncio import json import logging import shutil diff --git a/start.py b/start.py index 9263edde..bad1661a 100644 --- a/start.py +++ b/start.py @@ -256,9 +256,9 @@ def run_manual_spec_flow(project_dir: Path) -> bool: print(" Manual Specification Setup") print("-" * 50) print("\nTemplate files have been created. Edit these files in your editor:") - print(f"\n Required:") + print("\n Required:") print(f" {prompts_dir / 'app_spec.txt'}") - print(f"\n Optional (customize agent behavior):") + print("\n Optional (customize agent behavior):") print(f" {prompts_dir / 'initializer_prompt.md'}") print(f" {prompts_dir / 'coding_prompt.md'}") print("\n" + "-" * 50) diff --git a/start_ui.py b/start_ui.py index 648a64a3..6cd757fb 100644 --- a/start_ui.py +++ b/start_ui.py @@ -158,9 +158,9 @@ def start_dev_server(port: int) -> tuple: """Start both Vite and FastAPI in development mode.""" venv_python = get_venv_python() - print(f"\n Starting development servers...") + print("\n Starting development servers...") print(f" - FastAPI backend: http://127.0.0.1:{port}") - print(f" - Vite frontend: http://127.0.0.1:5173") + print(" - Vite frontend: http://127.0.0.1:5173") # Start FastAPI backend = subprocess.Popen([ diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 96066b24..6f53a7d8 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -164,6 +164,7 @@ function App() { projectName={selectedProject} status={wsState.agentStatus} yoloMode={agentStatusData?.yolo_mode ?? false} + parallelWorkers={agentStatusData?.parallel_workers ?? null} /> )} diff --git a/ui/src/components/AgentControl.tsx b/ui/src/components/AgentControl.tsx index 1b94fddd..101c4a7b 100644 --- a/ui/src/components/AgentControl.tsx +++ b/ui/src/components/AgentControl.tsx @@ -1,5 +1,5 @@ import { useState } from 'react' -import { Play, Pause, Square, Loader2, Zap } from 'lucide-react' +import { Play, Pause, Square, Loader2, Zap, Users } from 'lucide-react' import { useStartAgent, useStopAgent, @@ -12,10 +12,12 @@ interface AgentControlProps { projectName: string status: AgentStatus yoloMode?: boolean // From server status - whether currently running in YOLO mode + parallelWorkers?: number | null // From server status - current worker count } -export function AgentControl({ projectName, status, yoloMode = false }: AgentControlProps) { +export function AgentControl({ projectName, status, yoloMode = false, parallelWorkers = null }: AgentControlProps) { const [yoloEnabled, setYoloEnabled] = useState(false) + const [workerCount, setWorkerCount] = useState(null) // null = single agent const startAgent = useStartAgent(projectName) const stopAgent = useStopAgent(projectName) @@ -28,7 +30,7 @@ export function AgentControl({ projectName, status, yoloMode = false }: AgentCon pauseAgent.isPending || resumeAgent.isPending - const handleStart = () => startAgent.mutate(yoloEnabled) + const handleStart = () => startAgent.mutate({ yoloMode: yoloEnabled, parallelWorkers: workerCount }) const handleStop = () => stopAgent.mutate() const handlePause = () => pauseAgent.mutate() const handleResume = () => resumeAgent.mutate() @@ -48,6 +50,16 @@ export function AgentControl({ projectName, status, yoloMode = false }: AgentCon )} + {/* Parallel Workers Indicator - shown when running with multiple workers */} + {(status === 'running' || status === 'paused') && parallelWorkers && parallelWorkers > 1 && ( +
+ + + {parallelWorkers}x + +
+ )} + {/* Control Buttons */}
{status === 'stopped' || status === 'crashed' ? ( @@ -62,11 +74,33 @@ export function AgentControl({ projectName, status, yoloMode = false }: AgentCon > + {/* Parallel Workers Selector - cycles through 1/3/5 */} +