From 072bd084e15ede49aa51ffbcb9f33b88b6dd22de Mon Sep 17 00:00:00 2001 From: Gabi Date: Tue, 3 Feb 2026 14:11:55 +0100 Subject: [PATCH 1/2] fix: avoid heavy imports on package import --- src/autocoder/__init__.py | 69 ++++++++++++++++++++++++++++------ src/autocoder/core/__init__.py | 58 ++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 19 deletions(-) diff --git a/src/autocoder/__init__.py b/src/autocoder/__init__.py index c148dc05..5dcfb3ab 100644 --- a/src/autocoder/__init__.py +++ b/src/autocoder/__init__.py @@ -4,20 +4,31 @@ A powerful autonomous coding system with parallel agents, web UI, and MCP tools. """ -__version__ = "0.1.0" +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING, Any -# Core system exports -from autocoder.core.orchestrator import Orchestrator, create_orchestrator -from autocoder.core.gatekeeper import Gatekeeper -from autocoder.core.worktree_manager import WorktreeManager -from autocoder.core.knowledge_base import KnowledgeBase, get_knowledge_base -from autocoder.core.model_settings import ModelSettings, ModelPreset, get_full_model_id -from autocoder.core.test_framework_detector import TestFrameworkDetector -from autocoder.core.database import Database, get_database +__version__ = "0.1.0" -# Agent exports -from autocoder.agent.agent import run_autonomous_agent -from autocoder.agent.client import ClaudeSDKClient +_LAZY_EXPORTS: dict[str, tuple[str, str]] = { + # Core system + "Orchestrator": ("autocoder.core.orchestrator", "Orchestrator"), + "create_orchestrator": ("autocoder.core.orchestrator", "create_orchestrator"), + "Gatekeeper": ("autocoder.core.gatekeeper", "Gatekeeper"), + "WorktreeManager": ("autocoder.core.worktree_manager", "WorktreeManager"), + "KnowledgeBase": ("autocoder.core.knowledge_base", "KnowledgeBase"), + "get_knowledge_base": ("autocoder.core.knowledge_base", "get_knowledge_base"), + "ModelSettings": ("autocoder.core.model_settings", "ModelSettings"), + "ModelPreset": ("autocoder.core.model_settings", "ModelPreset"), + "get_full_model_id": ("autocoder.core.model_settings", "get_full_model_id"), + "TestFrameworkDetector": ("autocoder.core.test_framework_detector", "TestFrameworkDetector"), + "Database": ("autocoder.core.database", "Database"), + "get_database": ("autocoder.core.database", "get_database"), + # Agent + "run_autonomous_agent": ("autocoder.agent.agent", "run_autonomous_agent"), + "ClaudeSDKClient": ("autocoder.agent.client", "ClaudeSDKClient"), +} __all__ = [ # Core system @@ -37,3 +48,37 @@ "run_autonomous_agent", "ClaudeSDKClient", ] + +if TYPE_CHECKING: + from autocoder.agent.agent import run_autonomous_agent as run_autonomous_agent + from autocoder.agent.client import ClaudeSDKClient as ClaudeSDKClient + from autocoder.core.database import Database as Database + from autocoder.core.database import get_database as get_database + from autocoder.core.gatekeeper import Gatekeeper as Gatekeeper + from autocoder.core.knowledge_base import KnowledgeBase as KnowledgeBase + from autocoder.core.knowledge_base import get_knowledge_base as get_knowledge_base + from autocoder.core.model_settings import ModelPreset as ModelPreset + from autocoder.core.model_settings import ModelSettings as ModelSettings + from autocoder.core.model_settings import get_full_model_id as get_full_model_id + from autocoder.core.orchestrator import Orchestrator as Orchestrator + from autocoder.core.orchestrator import create_orchestrator as create_orchestrator + from autocoder.core.test_framework_detector import ( + TestFrameworkDetector as TestFrameworkDetector, + ) + from autocoder.core.worktree_manager import WorktreeManager as WorktreeManager + + +def __getattr__(name: str) -> Any: + spec = _LAZY_EXPORTS.get(name) + if not spec: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + module_name, attr_name = spec + module = importlib.import_module(module_name) + value = getattr(module, attr_name) + globals()[name] = value # Cache for future access + return value + + +def __dir__() -> list[str]: + return sorted(list(globals().keys()) + list(_LAZY_EXPORTS.keys())) diff --git a/src/autocoder/core/__init__.py b/src/autocoder/core/__init__.py index e96dd962..7044d068 100644 --- a/src/autocoder/core/__init__.py +++ b/src/autocoder/core/__init__.py @@ -11,13 +11,25 @@ - Database: SQLite database wrapper """ -from autocoder.core.orchestrator import Orchestrator, create_orchestrator -from autocoder.core.gatekeeper import Gatekeeper -from autocoder.core.worktree_manager import WorktreeManager -from autocoder.core.knowledge_base import KnowledgeBase, get_knowledge_base -from autocoder.core.model_settings import ModelSettings, ModelPreset, get_full_model_id -from autocoder.core.test_framework_detector import TestFrameworkDetector -from autocoder.core.database import Database, get_database +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING, Any + +_LAZY_EXPORTS: dict[str, tuple[str, str]] = { + "Orchestrator": ("autocoder.core.orchestrator", "Orchestrator"), + "create_orchestrator": ("autocoder.core.orchestrator", "create_orchestrator"), + "Gatekeeper": ("autocoder.core.gatekeeper", "Gatekeeper"), + "WorktreeManager": ("autocoder.core.worktree_manager", "WorktreeManager"), + "KnowledgeBase": ("autocoder.core.knowledge_base", "KnowledgeBase"), + "get_knowledge_base": ("autocoder.core.knowledge_base", "get_knowledge_base"), + "ModelSettings": ("autocoder.core.model_settings", "ModelSettings"), + "ModelPreset": ("autocoder.core.model_settings", "ModelPreset"), + "get_full_model_id": ("autocoder.core.model_settings", "get_full_model_id"), + "TestFrameworkDetector": ("autocoder.core.test_framework_detector", "TestFrameworkDetector"), + "Database": ("autocoder.core.database", "Database"), + "get_database": ("autocoder.core.database", "get_database"), +} __all__ = [ "Orchestrator", @@ -33,3 +45,35 @@ "Database", "get_database", ] + +if TYPE_CHECKING: + from autocoder.core.database import Database as Database + from autocoder.core.database import get_database as get_database + from autocoder.core.gatekeeper import Gatekeeper as Gatekeeper + from autocoder.core.knowledge_base import KnowledgeBase as KnowledgeBase + from autocoder.core.knowledge_base import get_knowledge_base as get_knowledge_base + from autocoder.core.model_settings import ModelPreset as ModelPreset + from autocoder.core.model_settings import ModelSettings as ModelSettings + from autocoder.core.model_settings import get_full_model_id as get_full_model_id + from autocoder.core.orchestrator import Orchestrator as Orchestrator + from autocoder.core.orchestrator import create_orchestrator as create_orchestrator + from autocoder.core.test_framework_detector import ( + TestFrameworkDetector as TestFrameworkDetector, + ) + from autocoder.core.worktree_manager import WorktreeManager as WorktreeManager + + +def __getattr__(name: str) -> Any: + spec = _LAZY_EXPORTS.get(name) + if not spec: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + module_name, attr_name = spec + module = importlib.import_module(module_name) + value = getattr(module, attr_name) + globals()[name] = value # Cache for future access + return value + + +def __dir__() -> list[str]: + return sorted(list(globals().keys()) + list(_LAZY_EXPORTS.keys())) From 8f1945f428ceddb303bbabb589298a2fd45eb6f6 Mon Sep 17 00:00:00 2001 From: Gabi Date: Tue, 3 Feb 2026 14:16:36 +0100 Subject: [PATCH 2/2] core: harden sqlite feature claiming with atomic updates --- src/autocoder/core/database.py | 266 ++++++++++++++++++++++----------- tests/test_db_atomicity.py | 78 ++++++++++ 2 files changed, 254 insertions(+), 90 deletions(-) create mode 100644 tests/test_db_atomicity.py diff --git a/src/autocoder/core/database.py b/src/autocoder/core/database.py index 7313d3ff..5292b8fd 100644 --- a/src/autocoder/core/database.py +++ b/src/autocoder/core/database.py @@ -13,6 +13,7 @@ import sqlite3 import json +import contextlib import logging import os import random @@ -152,6 +153,26 @@ def get_connection(self): finally: conn.close() + @contextmanager + def atomic_transaction(self): + """ + Execute a transaction with an early write lock for cross-process safety. + + SQLite's default DEFERRED transactions can allow multiple writers to race through + SELECT-before-UPDATE patterns. BEGIN IMMEDIATE acquires a RESERVED lock up-front, + preventing competing writers from entering write mode mid-transaction. + """ + with self.get_connection() as conn: + cursor = conn.cursor() + cursor.execute("BEGIN IMMEDIATE") + try: + yield conn, cursor + conn.commit() + except Exception: + with contextlib.suppress(Exception): + conn.rollback() + raise + def _init_schema(self): """Initialize database schema.""" with self.get_connection() as conn: @@ -1181,65 +1202,92 @@ def claim_next_pending_feature( """ Atomically claim the next pending feature. - This is safe under concurrency: it uses an UPDATE gated by status='PENDING' - and retries if another agent wins the race. + This is safe under concurrency: it uses an atomic UPDATE to avoid + SELECT-before-UPDATE snapshot races. """ if not agent_id: raise ValueError("agent_id is required to claim a feature") - with self.get_connection() as conn: - cursor = conn.cursor() - - order_by = "f.priority DESC, f.id ASC" - if prioritize_blockers: - order_by = ( - "f.priority DESC, " - "(SELECT COUNT(1) FROM feature_dependencies dd " - " JOIN features fx ON fx.id = dd.feature_id " - " WHERE dd.depends_on_id = f.id AND fx.status IN ('PENDING','IN_PROGRESS')) DESC, " - "f.id ASC" - ) - - for _ in range(max_attempts): - cursor.execute( - f""" - SELECT f.id, f.branch_name FROM features f - WHERE f.status = 'PENDING' - AND f.enabled = 1 - AND (f.next_attempt_at IS NULL OR f.next_attempt_at <= CURRENT_TIMESTAMP) - AND NOT EXISTS ( - SELECT 1 - FROM feature_dependencies d - JOIN features dep ON dep.id = d.depends_on_id - WHERE d.feature_id = f.id AND dep.status != 'DONE' - ) - ORDER BY {order_by} - LIMIT 1 - """ - ) - row = cursor.fetchone() - if not row: - return None + order_by = "f.priority DESC, f.id ASC" + if prioritize_blockers: + order_by = ( + "f.priority DESC, " + "(SELECT COUNT(1) FROM feature_dependencies dd " + " JOIN features fx ON fx.id = dd.feature_id " + " WHERE dd.depends_on_id = f.id AND fx.status IN ('PENDING','IN_PROGRESS')) DESC, " + "f.id ASC" + ) - feature_id = int(row[0]) - existing_branch = str(row[1] or "").strip() - branch_name = existing_branch or f"{branch_prefix}/{feature_id}-{int(time.time())}" + select_subquery = f""" + SELECT f.id + FROM features f + WHERE f.status = 'PENDING' + AND f.enabled = 1 + AND (f.next_attempt_at IS NULL OR f.next_attempt_at <= CURRENT_TIMESTAMP) + AND NOT EXISTS ( + SELECT 1 + FROM feature_dependencies d + JOIN features dep ON dep.id = d.depends_on_id + WHERE d.feature_id = f.id AND dep.status != 'DONE' + ) + ORDER BY {order_by} + LIMIT 1 + """ - cursor.execute(""" - UPDATE features - SET status = 'IN_PROGRESS', - assigned_agent_id = ?, - assigned_at = CURRENT_TIMESTAMP, - branch_name = COALESCE(branch_name, ?), - updated_at = CURRENT_TIMESTAMP - WHERE id = ? AND status = 'PENDING' - """, (agent_id, branch_name, feature_id)) + def _try_returning(cursor: sqlite3.Cursor) -> Optional[int]: + cursor.execute( + f""" + UPDATE features + SET status = 'IN_PROGRESS', + assigned_agent_id = ?, + assigned_at = CURRENT_TIMESTAMP, + branch_name = COALESCE( + NULLIF(TRIM(branch_name), ''), + ? || '/' || id || '-' || CAST(strftime('%s','now') AS TEXT) + ), + updated_at = CURRENT_TIMESTAMP + WHERE id = ({select_subquery}) + AND status = 'PENDING' + RETURNING id + """, + (agent_id, str(branch_prefix or "feat")), + ) + row = cursor.fetchone() + return int(row[0]) if row else None - if cursor.rowcount > 0: - conn.commit() - return self.get_feature(feature_id) + feature_id: int | None = None + with self.atomic_transaction() as (_conn, cursor): + try: + feature_id = _try_returning(cursor) + except sqlite3.OperationalError: + # Older SQLite builds may not support RETURNING. Fall back to a safe loop + # while holding the write lock (BEGIN IMMEDIATE). + for _ in range(max_attempts): + cursor.execute(select_subquery) + row = cursor.fetchone() + if not row: + break + candidate = int(row[0]) + cursor.execute( + """ + UPDATE features + SET status = 'IN_PROGRESS', + assigned_agent_id = ?, + assigned_at = CURRENT_TIMESTAMP, + branch_name = COALESCE( + NULLIF(TRIM(branch_name), ''), + ? || '/' || id || '-' || CAST(strftime('%s','now') AS TEXT) + ), + updated_at = CURRENT_TIMESTAMP + WHERE id = ? AND status = 'PENDING' + """, + (agent_id, str(branch_prefix or "feat"), candidate), + ) + if cursor.rowcount > 0: + feature_id = candidate + break - return None + return self.get_feature(feature_id) if feature_id else None def get_pending_queue_state(self) -> Dict[str, Any]: """ @@ -1664,43 +1712,83 @@ def claim_batch( Returns: List of claimed feature IDs """ - with self.get_connection() as conn: - cursor = conn.cursor() - - claimed_ids = [] - - # Claim features one by one (transaction provides locking) - for i in range(count): - cursor.execute(""" - SELECT id FROM features - WHERE status = 'PENDING' - AND enabled = 1 - ORDER BY priority DESC, id ASC - LIMIT 1 - """) - - row = cursor.fetchone() - if not row: - break # No more pending features + if count <= 0: + return [] + if not agent_id: + raise ValueError("agent_id is required to claim features") - feature_id = row[0] - branch_name = branch_names[i] if i < len(branch_names) else f"feat/{feature_id}" + claimed_ids: list[int] = [] - cursor.execute(""" - UPDATE features - SET status = 'IN_PROGRESS', - assigned_agent_id = ?, - assigned_at = CURRENT_TIMESTAMP, - branch_name = ?, - updated_at = CURRENT_TIMESTAMP - WHERE id = ? AND status = 'PENDING' AND enabled = 1 - """, (agent_id, branch_name, feature_id)) - - if cursor.rowcount > 0: - claimed_ids.append(feature_id) + with self.atomic_transaction() as (_conn, cursor): + for i in range(int(count)): + desired_branch = str(branch_names[i] if i < len(branch_names) else "").strip() + try: + cursor.execute( + """ + UPDATE features + SET status = 'IN_PROGRESS', + assigned_agent_id = ?, + assigned_at = CURRENT_TIMESTAMP, + branch_name = COALESCE( + NULLIF(TRIM(branch_name), ''), + NULLIF(?, ''), + 'feat/' || id || '-' || CAST(strftime('%s','now') AS TEXT) + ), + updated_at = CURRENT_TIMESTAMP + WHERE id = ( + SELECT id + FROM features + WHERE status = 'PENDING' + AND enabled = 1 + ORDER BY priority DESC, id ASC + LIMIT 1 + ) + AND status = 'PENDING' + AND enabled = 1 + RETURNING id + """, + (agent_id, desired_branch), + ) + row = cursor.fetchone() + if not row: + break + claimed_ids.append(int(row[0])) + except sqlite3.OperationalError: + # Fallback without RETURNING (still safe under BEGIN IMMEDIATE). + cursor.execute( + """ + SELECT id + FROM features + WHERE status = 'PENDING' + AND enabled = 1 + ORDER BY priority DESC, id ASC + LIMIT 1 + """ + ) + row = cursor.fetchone() + if not row: + break + feature_id = int(row[0]) + cursor.execute( + """ + UPDATE features + SET status = 'IN_PROGRESS', + assigned_agent_id = ?, + assigned_at = CURRENT_TIMESTAMP, + branch_name = COALESCE( + NULLIF(TRIM(branch_name), ''), + NULLIF(?, ''), + 'feat/' || id || '-' || CAST(strftime('%s','now') AS TEXT) + ), + updated_at = CURRENT_TIMESTAMP + WHERE id = ? AND status = 'PENDING' AND enabled = 1 + """, + (agent_id, desired_branch, feature_id), + ) + if cursor.rowcount > 0: + claimed_ids.append(feature_id) - conn.commit() - return claimed_ids + return claimed_ids def update_feature_status( self, @@ -1734,7 +1822,7 @@ def mark_feature_passing(self, feature_id: int) -> bool: review_status = 'VERIFIED', completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP - WHERE id = ? + WHERE id = ? AND COALESCE(passes, 0) = 0 """, (feature_id,)) conn.commit() return cursor.rowcount > 0 @@ -1754,7 +1842,7 @@ def mark_feature_ready_for_verification(self, feature_id: int) -> bool: passes = FALSE, review_status = 'READY_FOR_VERIFICATION', updated_at = CURRENT_TIMESTAMP - WHERE id = ? + WHERE id = ? AND COALESCE(passes, 0) = 0 """, (feature_id,), ) @@ -1772,8 +1860,7 @@ def mark_feature_failed( next_status: str | None = None, ) -> bool: """Mark a feature as failed (reset for retry).""" - with self.get_connection() as conn: - cursor = conn.cursor() + with self.atomic_transaction() as (_conn, cursor): cursor.execute( "SELECT attempts, last_error_key, same_error_streak, last_diff_fingerprint, same_diff_streak, branch_name FROM features WHERE id = ?", (feature_id,), @@ -1876,7 +1963,6 @@ def mark_feature_failed( feature_id, ), ) - conn.commit() return cursor.rowcount > 0 def force_retry_feature(self, feature_id: int, *, preserve_branch: bool = True) -> bool: diff --git a/tests/test_db_atomicity.py b/tests/test_db_atomicity.py new file mode 100644 index 00000000..1cc0d751 --- /dev/null +++ b/tests/test_db_atomicity.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import multiprocessing as mp +from pathlib import Path + + +def _claim_one(db_dir: str, agent_id: str, out_q) -> None: # type: ignore[no-untyped-def] + from autocoder.core.database import get_database + + db = get_database(db_dir) + claimed = db.claim_next_pending_feature(agent_id) + out_q.put(int(claimed["id"]) if claimed else None) + + +def test_claim_next_is_unique_under_multiprocessing(tmp_path: Path) -> None: + """ + Ensure concurrent claims cannot claim the same feature twice. + + This is a best-effort simulation of orchestrator/worker/MCP concurrent access on Windows + where multiprocessing uses spawn semantics. + """ + from autocoder.core.database import get_database + + project_dir = tmp_path + db = get_database(str(project_dir)) + + # Create a small pool of claimable features. + total_features = 12 + for i in range(total_features): + db.create_feature( + name=f"Feature {i}", + description="test", + category="test", + priority=0, + ) + + ctx = mp.get_context("spawn") + q = ctx.Queue() + procs: list[mp.Process] = [] + + concurrent_claimers = 10 + for i in range(concurrent_claimers): + p = ctx.Process(target=_claim_one, args=(str(project_dir), f"agent-{i}", q)) + p.start() + procs.append(p) + + results: list[int | None] = [] + for _ in range(concurrent_claimers): + results.append(q.get(timeout=30)) + + for p in procs: + p.join(timeout=30) + assert p.exitcode == 0 + + claimed_ids = [r for r in results if r is not None] + assert len(claimed_ids) == len(set(claimed_ids)) + assert len(claimed_ids) <= total_features + + # Verify assigned agent IDs were recorded. + for fid in claimed_ids: + feature = db.get_feature(int(fid)) + assert feature is not None + assert str(feature.get("assigned_agent_id") or "").strip() != "" + + +def test_mark_passing_is_idempotent(tmp_path: Path) -> None: + from autocoder.core.database import get_database + + db = get_database(str(tmp_path)) + feature_id = db.create_feature( + name="Feature", + description="test", + category="test", + priority=0, + ) + + assert db.mark_feature_passing(int(feature_id)) is True + assert db.mark_feature_passing(int(feature_id)) is False