diff --git a/.console/backlog.md b/.console/backlog.md index e3266ba4..3d75081f 100644 --- a/.console/backlog.md +++ b/.console/backlog.md @@ -8,6 +8,8 @@ _(none — system locked at Rev 10)_ ## Up Next +- [x] **Phase 6 — dispatch control crash-safety + dual-PID tracking (2026-05-04, branch phase6-dispatch-control)**: All 6 slices complete. New `audit_dispatch/lock_store.py` (PersistentLockStore + dual-PID payload, atomic writes, fcntl sentinel via audit_governance/file_locks); `locks.py` refactored as façade over the store with full-identity acquire signature; `executor.execute()` accepts `on_spawn(pid, pgid)` callback; `api.py` carries identity through; stale-PID reclaim + lazy first-use sweep; new CLI commands `list-active / unlock / dispatch / watch` on `operations-center-audit`; cross-process concurrency proof test; in-flight run_status watcher (polling, no watchdog dep). Sentinel-glob bug fixed (`_iter_lock_files` filters `.lock.lock` recursive sentinels). Tests: 64 new + all existing passing. Full unit suite 2041 pass. + - [ ] Phase 13 or next operator directive TBD ## Done diff --git a/.console/log.md b/.console/log.md index bfb6cea1..5a5bcc31 100644 --- a/.console/log.md +++ b/.console/log.md @@ -3,10 +3,17 @@ _Chronological continuity log. Decisions, stop points, what changed and why._ _Not a task tracker — that's backlog.md. Keep entries concise and dated._ +## Stop Points + +- Phase 6 — dispatch control crash-safety + dual-PID tracking (2026-05-04, branch `phase6-dispatch-control`): All 6 slices landed. (A) New `audit_dispatch/lock_store.py` with `PersistentLockStore` + `PersistentLockPayload`; atomic write-tempfile + `os.replace`; `fcntl.flock` sentinel via the existing `audit_governance/file_locks.locked_state_file` helper (lazy-imported to avoid `audit_governance` package-init cycle). Dual-PID payload baked in: `oc_pid` (supervisor) + `audit_pid` (subprocess) + `audit_pgid`. (B) `locks.py` refactored to delegate persistence to the store; `acquire_audit_lock(repo_id, *, run_id, audit_type, oc_pid, command, expected_run_status_path)` carries identity. `executor.execute()` accepts `on_spawn(pid, pgid)` callback wired to `lock.update_audit_pid` so the subprocess PID is patched into the lock immediately after Popen. `api.py` resolves the absolute expected output dir and threads it through. (C) Stale-reclaim policy: a lock is alive iff *any* recorded PID is alive (`os.kill(pid, 0)`). Fresh registry sweeps on first use to recover from OC crash. Corrupt lock files treated as stale (operator can `unlock --force`). (D) New CLI commands on `operations-center-audit`: `list-active` (Rich table or `--json`), `unlock --repo X [--force]`, `dispatch ` positional alias, `watch --repo X` (in-flight `run_status.json` polling). (E) `tests/unit/audit_dispatch/test_lock_store_concurrency.py` spawns two real subprocesses competing for the same repo lock; asserts exactly one acquires. (F) New `watcher.py` exposes `poll_run_status(expected_output_dir, run_id)` iterator yielding `RunStatusSnapshot` on each on-disk content change, terminating on `completed/failed/interrupted`; locates VF buckets by `run_id` substring match per the existing report-naming convention (no `watchdog` dep). Sentinel-glob bug caught in test: `_iter_lock_files` filters out `*.lock.lock`-style sentinels so sweep doesn't recursively wrap. Test counts: 22 lock_store + 18 locks + 18 audit-cli + 2 cross-process + 4 watcher tests; full unit suite 2041 pass (architecture_invariants pre-broken collection error pre-existing on main). + ## Recent Decisions | Decision | Rationale | Date | |----------|-----------|------| +| Phase 6 dual-PID lock (oc_pid + audit_pid) | Single-PID design left orphaned audit subprocesses invisible to liveness checks (OC dies, audit lives → lock looks reclaimable but artifact writes still happening). Dual-PID treats lock as alive iff any recorded PID is alive — orphaned audit holds the lock until it exits, which is correct. | 2026-05-04 | +| Persistent lock at state/audit_dispatch/locks/{repo_id}.lock | Matches existing OC `state/{subsystem}/...` convention. JSON payload, atomic os.replace. fcntl.flock sentinel via existing audit_governance file_locks helper — no new dep, cross-process exclusion proven by test_lock_store_concurrency.py. | 2026-05-04 | +| Polling over watchdog for in-flight run_status observation | watchdog is not a current OC runtime dep. Polling at 2s default interval is adequate for human-observable lifecycle transitions (running→completed). No new runtime dep. | 2026-05-04 | | C41 json.dumps ensure_ascii=False | 131 json.dumps calls across 43 files now include ensure_ascii=False; prevents silent Unicode escaping in logs and payloads | 2026-05-03 | | T4 orphan fixtures deleted | Custodian T4 detector flagged default_proposal(), default_decision() (policy/conftest.py) and index_from_example_failed() (behavior_calibration/conftest.py) as never requested; all removed; 279 tests pass | 2026-05-02 | | `artifact_manifest_path` is `Optional[str]` in model | VF doesn't write it yet; `is_compliant` enforces it without rejecting legacy files | 2026-04-26 | diff --git a/src/operations_center/audit_dispatch/__init__.py b/src/operations_center/audit_dispatch/__init__.py index bf5e4d5a..3eba6248 100644 --- a/src/operations_center/audit_dispatch/__init__.py +++ b/src/operations_center/audit_dispatch/__init__.py @@ -27,11 +27,23 @@ """ from .api import dispatch_managed_audit -from .errors import AuditDispatchConfigError, AuditDispatchError, RepoLockAlreadyHeldError +from .errors import ( + AuditDispatchConfigError, + AuditDispatchError, + LockStoreCorruptError, + RepoLockAlreadyHeldError, + StaleLockReclaimedWarning, +) +from .lock_store import ( + LOCK_SCHEMA_VERSION, + PersistentLockPayload, + PersistentLockStore, +) from .locks import ( ManagedRepoAuditLock, ManagedRepoAuditLockRegistry, acquire_audit_lock, + get_global_registry, is_audit_locked, ) from .models import ( @@ -40,18 +52,27 @@ ManagedAuditDispatchRequest, ManagedAuditDispatchResult, ) +from .watcher import RunStatusSnapshot, poll_run_status __all__ = [ "dispatch_managed_audit", "AuditDispatchError", "AuditDispatchConfigError", "RepoLockAlreadyHeldError", + "LockStoreCorruptError", + "StaleLockReclaimedWarning", + "LOCK_SCHEMA_VERSION", + "PersistentLockPayload", + "PersistentLockStore", "ManagedRepoAuditLock", "ManagedRepoAuditLockRegistry", "acquire_audit_lock", + "get_global_registry", "is_audit_locked", "DispatchStatus", "FailureKind", "ManagedAuditDispatchRequest", "ManagedAuditDispatchResult", + "RunStatusSnapshot", + "poll_run_status", ] diff --git a/src/operations_center/audit_dispatch/api.py b/src/operations_center/audit_dispatch/api.py index 2bea2085..c15b0a93 100644 --- a/src/operations_center/audit_dispatch/api.py +++ b/src/operations_center/audit_dispatch/api.py @@ -169,12 +169,26 @@ def dispatch_managed_audit( ) executor = ManagedAuditExecutor(effective_log_dir) - # Step 3: Acquire per-repo lock — raises if already held. - lock = acquire_audit_lock(request.repo_id) + # Step 3: Acquire per-repo lock with full identity payload — raises if held. + # The actual run_status.json path is not known until VF creates the per-run + # bucket dir (named with run_id as a suffix). Persist the parent output dir + # so external observers can discover the bucket via the same logic as lifecycle.py. + expected_output_dir_abs = str( + (Path(working_dir_abs) / invocation.expected_output_dir).resolve() + ) + lock = acquire_audit_lock( + request.repo_id, + run_id=run_id, + audit_type=request.audit_type, + oc_pid=os.getpid(), + command=invocation.command, + expected_run_status_path=expected_output_dir_abs, + ) try: proc_result = executor.execute( invocation, timeout_seconds=request.timeout_seconds, + on_spawn=lock.update_audit_pid, ) finally: lock.release() diff --git a/src/operations_center/audit_dispatch/errors.py b/src/operations_center/audit_dispatch/errors.py index c814f6d6..b5015e03 100644 --- a/src/operations_center/audit_dispatch/errors.py +++ b/src/operations_center/audit_dispatch/errors.py @@ -42,3 +42,19 @@ class AuditDispatchConfigError(AuditDispatchError): These are not operational failures — they indicate the dispatch was incorrectly configured and must be fixed before retrying. """ + + +class LockStoreCorruptError(AuditDispatchError): + """A persistent lock file exists but is malformed (bad JSON or schema). + + Recoverable via ``operations-center-audit unlock --repo X --force`` after + operator inspection. + """ + + +class StaleLockReclaimedWarning(UserWarning): + """Emitted when a stale persistent lock is reclaimed during normal dispatch. + + A warning rather than an error — reclaim is the intended recovery path + after an OpsCenter crash. + """ diff --git a/src/operations_center/audit_dispatch/executor.py b/src/operations_center/audit_dispatch/executor.py index 7c66bbe4..558abb02 100644 --- a/src/operations_center/audit_dispatch/executor.py +++ b/src/operations_center/audit_dispatch/executor.py @@ -15,6 +15,7 @@ import shlex import signal import subprocess +from collections.abc import Callable from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path @@ -65,6 +66,7 @@ def execute( *, timeout_seconds: float | None = None, cwd_override: str | None = None, + on_spawn: Callable[[int, int | None], None] | None = None, ) -> ProcessResult: """Execute the managed audit command. @@ -104,6 +106,18 @@ def execute( open(stderr_path, "w", encoding="utf-8", errors="replace") as err_f, ): proc = subprocess.Popen(args, stdout=out_f, stderr=err_f, **popen_kwargs) + if on_spawn is not None: + pgid: int | None = None + if platform.system() != "Windows": + try: + pgid = os.getpgid(proc.pid) + except (ProcessLookupError, OSError): + pgid = None + try: + on_spawn(proc.pid, pgid) + except Exception: + # on_spawn must never break dispatch — best-effort hook. + pass try: proc.wait(timeout=timeout_seconds) except subprocess.TimeoutExpired: diff --git a/src/operations_center/audit_dispatch/lock_store.py b/src/operations_center/audit_dispatch/lock_store.py new file mode 100644 index 00000000..f772c32a --- /dev/null +++ b/src/operations_center/audit_dispatch/lock_store.py @@ -0,0 +1,336 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 Velascat +"""Crash-safe on-disk lock store for managed-repo audit dispatch. + +Each held lock is one JSON file at ``state/audit_dispatch/locks/{repo_id}.lock`` +written atomically (tempfile + ``os.replace``). Cross-process exclusion is +enforced via ``fcntl.flock`` on a sentinel file beside the payload, using the +existing ``audit_governance.file_locks.locked_state_file`` helper. + +Liveness model (dual-PID) +------------------------- +A held lock records two PIDs: + +* ``oc_pid`` — the OperationsCenter process that dispatched the audit. +* ``audit_pid`` — the audit subprocess PID, populated once the executor has + called ``Popen``. Until then it is ``None``. + +A lock is considered **stale** (eligible for reclaim) iff *all* recorded PIDs +are dead. This means an audit subprocess orphaned by an OpsCenter crash holds +the lock until it exits, which is the correct behavior — we must not dispatch +a second audit while the first is still writing artifacts. + +POSIX-only — uses ``os.kill(pid, 0)`` and ``fcntl``. +""" + +from __future__ import annotations + +import errno +import json +import os +import socket +import tempfile +from dataclasses import dataclass, field, replace +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from .errors import ( + LockStoreCorruptError, + RepoLockAlreadyHeldError, + StaleLockReclaimedWarning, +) + + +def _locked_state_file(path: Path): + """Lazy import shim — avoids circular import via audit_governance package init.""" + from operations_center.audit_governance.file_locks import locked_state_file + return locked_state_file(path) + +LOCK_SCHEMA_VERSION = 1 + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def _is_pid_alive(pid: int | None) -> bool: + """Return True if a process with this PID exists and we can signal it. + + Uses ``os.kill(pid, 0)`` — sends no signal, but raises if the PID is + unknown (ESRCH) or unreachable (EPERM means it exists but we lack + permission, so we conservatively consider it alive). + """ + if pid is None or pid <= 0: + return False + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + except OSError as exc: + if exc.errno == errno.ESRCH: + return False + return True + return True + + +@dataclass(frozen=True, slots=True) +class PersistentLockPayload: + """Serialized contents of one ``{repo_id}.lock`` file. + + Frozen so the in-process registry can hand callers a snapshot without + risk of mutation. To update fields on disk, call + ``PersistentLockStore.update(repo_id, audit_pid=...)``. + """ + + repo_id: str + run_id: str + audit_type: str + oc_pid: int + started_at: str + command: str + expected_run_status_path: str + audit_pid: int | None = None + audit_pgid: int | None = None + owner_hostname: str = field(default_factory=socket.gethostname) + lock_schema_version: int = LOCK_SCHEMA_VERSION + + def to_json(self) -> dict[str, Any]: + return { + "lock_schema_version": self.lock_schema_version, + "repo_id": self.repo_id, + "run_id": self.run_id, + "audit_type": self.audit_type, + "oc_pid": self.oc_pid, + "audit_pid": self.audit_pid, + "audit_pgid": self.audit_pgid, + "started_at": self.started_at, + "command": self.command, + "expected_run_status_path": self.expected_run_status_path, + "owner_hostname": self.owner_hostname, + } + + @classmethod + def from_json(cls, data: dict[str, Any]) -> "PersistentLockPayload": + try: + return cls( + repo_id=str(data["repo_id"]), + run_id=str(data["run_id"]), + audit_type=str(data["audit_type"]), + oc_pid=int(data["oc_pid"]), + started_at=str(data["started_at"]), + command=str(data["command"]), + expected_run_status_path=str(data["expected_run_status_path"]), + audit_pid=( + int(data["audit_pid"]) + if data.get("audit_pid") is not None + else None + ), + audit_pgid=( + int(data["audit_pgid"]) + if data.get("audit_pgid") is not None + else None + ), + owner_hostname=str(data.get("owner_hostname", socket.gethostname())), + lock_schema_version=int( + data.get("lock_schema_version", LOCK_SCHEMA_VERSION) + ), + ) + except (KeyError, ValueError, TypeError) as exc: + raise LockStoreCorruptError( + f"malformed lock payload: missing/invalid field ({exc})" + ) from exc + + def is_alive(self) -> bool: + """True if any recorded PID is alive (OC or audit subprocess).""" + return _is_pid_alive(self.oc_pid) or _is_pid_alive(self.audit_pid) + + def liveness_summary(self) -> dict[str, bool]: + return { + "oc_pid_alive": _is_pid_alive(self.oc_pid), + "audit_pid_alive": _is_pid_alive(self.audit_pid), + } + + +class PersistentLockStore: + """File-backed lock store at ``state/audit_dispatch/locks/``. + + Methods are safe for concurrent use across both threads (in-process) and + OS processes (via ``fcntl.flock`` sentinel files). + """ + + def __init__(self, state_dir: Path) -> None: + self._state_dir = Path(state_dir) + self._state_dir.mkdir(parents=True, exist_ok=True) + + @property + def state_dir(self) -> Path: + return self._state_dir + + def _lock_path(self, repo_id: str) -> Path: + return self._state_dir / f"{repo_id}.lock" + + # ------------------------------------------------------------------ + # Read + # ------------------------------------------------------------------ + + def read(self, repo_id: str) -> PersistentLockPayload | None: + """Return the payload for repo_id, or None if no lock file exists. + + Raises LockStoreCorruptError if the file exists but is malformed. + """ + path = self._lock_path(repo_id) + if not path.exists(): + return None + try: + data = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + raise LockStoreCorruptError( + f"lock file {path} is not valid JSON: {exc}" + ) from exc + return PersistentLockPayload.from_json(data) + + def _iter_lock_files(self) -> list[Path]: + """Yield only first-tier ``{repo_id}.lock`` files, never sentinels. + + ``locked_state_file`` creates ``{name}.lock`` sentinels next to each + target file. A naive ``glob("*.lock")`` would match those too and + cause recursive sentinel creation. We filter to filenames whose stem + contains no dot (i.e., ``foo.lock`` matches but ``foo.lock.lock`` does not). + """ + out: list[Path] = [] + if not self._state_dir.exists(): + return out + for path in sorted(self._state_dir.glob("*.lock")): + if "." in path.stem: + continue + out.append(path) + return out + + def list_active(self) -> list[PersistentLockPayload]: + """All currently-held locks (corrupt files are skipped, not raised).""" + out: list[PersistentLockPayload] = [] + for path in self._iter_lock_files(): + try: + data = json.loads(path.read_text(encoding="utf-8")) + out.append(PersistentLockPayload.from_json(data)) + except (json.JSONDecodeError, LockStoreCorruptError): + continue + return out + + # ------------------------------------------------------------------ + # Acquire / release / update + # ------------------------------------------------------------------ + + def try_acquire(self, payload: PersistentLockPayload) -> PersistentLockPayload: + """Acquire the lock for payload.repo_id and write payload to disk. + + Raises + ------ + RepoLockAlreadyHeldError + If another live lock already exists for this repo. The held + payload is included on the exception via ``.held_payload``. + """ + path = self._lock_path(payload.repo_id) + with _locked_state_file(path): + existing = self.read(payload.repo_id) + if existing is not None and existing.is_alive(): + err = RepoLockAlreadyHeldError( + f"managed audit lock for repo {payload.repo_id!r} is held by " + f"run {existing.run_id} (oc_pid={existing.oc_pid}, " + f"audit_pid={existing.audit_pid})" + ) + # Stash for diagnostics — CLI surfaces this. + err.held_payload = existing # type: ignore[attr-defined] + raise err + self._write_atomic(path, payload) + return payload + + def release(self, repo_id: str) -> bool: + """Remove the lock file for repo_id. Idempotent; True if removed.""" + path = self._lock_path(repo_id) + with _locked_state_file(path): + if not path.exists(): + return False + path.unlink() + return True + + def update(self, repo_id: str, **changes: Any) -> PersistentLockPayload: + """Atomically update specific fields on the held lock payload. + + Used to record ``audit_pid`` and ``audit_pgid`` after the executor + has spawned the subprocess. Raises ``FileNotFoundError`` if the lock + is not currently held. + """ + path = self._lock_path(repo_id) + with _locked_state_file(path): + existing = self.read(repo_id) + if existing is None: + raise FileNotFoundError( + f"no lock to update for repo {repo_id!r}" + ) + updated = replace(existing, **changes) + self._write_atomic(path, updated) + return updated + + def reclaim_if_stale(self, repo_id: str) -> bool: + """Release the lock if all recorded PIDs are dead. True if reclaimed.""" + path = self._lock_path(repo_id) + with _locked_state_file(path): + try: + existing = self.read(repo_id) + except LockStoreCorruptError: + # Corrupt file is treated as stale (we can't verify it, + # but blocking dispatch on a corrupt file is worse). + path.unlink(missing_ok=True) + return True + if existing is None: + return False + if existing.is_alive(): + return False + path.unlink(missing_ok=True) + return True + + def sweep_stale(self) -> list[str]: + """Reclaim every stale lock under state_dir. Returns repo_ids reclaimed.""" + reclaimed: list[str] = [] + for path in self._iter_lock_files(): + repo_id = path.stem + if self.reclaim_if_stale(repo_id): + reclaimed.append(repo_id) + return reclaimed + + # ------------------------------------------------------------------ + # internals + # ------------------------------------------------------------------ + + def _write_atomic(self, path: Path, payload: PersistentLockPayload) -> None: + """Write payload as JSON via tempfile + os.replace (atomic on POSIX).""" + path.parent.mkdir(parents=True, exist_ok=True) + text = json.dumps(payload.to_json(), indent=2, ensure_ascii=False) + fd, tmp_path = tempfile.mkstemp( + prefix=f"{path.name}.", + suffix=".tmp", + dir=str(path.parent), + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(text) + f.write("\n") + os.replace(tmp_path, path) + except Exception: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +__all__ = [ + "LOCK_SCHEMA_VERSION", + "PersistentLockPayload", + "PersistentLockStore", + "StaleLockReclaimedWarning", +] diff --git a/src/operations_center/audit_dispatch/locks.py b/src/operations_center/audit_dispatch/locks.py index df1847b3..c51515cd 100644 --- a/src/operations_center/audit_dispatch/locks.py +++ b/src/operations_center/audit_dispatch/locks.py @@ -1,50 +1,92 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # Copyright (C) 2026 Velascat -"""In-memory one-audit-per-repo lock registry. +"""One-audit-per-repo lock registry, backed by a crash-safe persistent store. -Policy: at most one managed audit per repo_id may be dispatched at a time. +Two layers: -Crash-safety note: - This implementation holds lock state in-memory. If the OperationsCenter - process crashes while an audit is running, all held locks are dropped on - restart — stale locks do not persist. Process-crash-safe distributed locks - are intentionally out of scope for Phase 6. +* In-process ``threading.Lock`` for thread-safety within one OpsCenter process. +* On-disk ``PersistentLockStore`` for cross-process exclusion + crash-safety. + +Public surface (preserved across the Phase 6 refactor): + ``ManagedRepoAuditLock``, ``ManagedRepoAuditLockRegistry``, + ``acquire_audit_lock(repo_id, ...)``, ``is_audit_locked(repo_id)``. + +The legacy zero-arg ``acquire_audit_lock(repo_id)`` form is preserved for +unit tests that don't carry identity context — it synthesizes a payload +identifying the OpsCenter PID only. """ from __future__ import annotations +import os import threading +from pathlib import Path from types import TracebackType from .errors import RepoLockAlreadyHeldError +from .lock_store import PersistentLockPayload, PersistentLockStore + +# OpsCenter root: locks live at /state/audit_dispatch/locks/. +# locks.py is at: src/operations_center/audit_dispatch/locks.py +# parents[0] = audit_dispatch/, [1] = operations_center/, [2] = src/, [3] = OC root +_OC_ROOT = Path(__file__).resolve().parents[3] +_DEFAULT_STATE_DIR = _OC_ROOT / "state" / "audit_dispatch" / "locks" + + +def _now_iso() -> str: + from datetime import UTC, datetime + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") class ManagedRepoAuditLock: """Context manager for a single-repo audit lock. - Acquired via ManagedRepoAuditLockRegistry.acquire(). - Release is idempotent — double-releases are silently ignored. + Acquired via ``ManagedRepoAuditLockRegistry.acquire()``. Release is + idempotent — double-releases are silently ignored. Use as a context manager (preferred) or call release() explicitly: - lock = acquire_audit_lock("videofoundry") + lock = acquire_audit_lock("videofoundry", run_id=...) try: ... finally: lock.release() """ - __slots__ = ("_repo_id", "_registry", "_released") + __slots__ = ("_repo_id", "_registry", "_released", "_payload") - def __init__(self, repo_id: str, registry: "ManagedRepoAuditLockRegistry") -> None: + def __init__( + self, + repo_id: str, + registry: "ManagedRepoAuditLockRegistry", + payload: PersistentLockPayload | None = None, + ) -> None: self._repo_id = repo_id self._registry = registry self._released = False + self._payload = payload @property def repo_id(self) -> str: return self._repo_id + @property + def payload(self) -> PersistentLockPayload | None: + """The persistent-store payload backing this lock (None for legacy locks).""" + return self._payload + + def update_audit_pid(self, audit_pid: int, audit_pgid: int | None = None) -> None: + """Record the audit subprocess PID after the executor has spawned it. + + Safe to call multiple times. No-op if the lock has no persistent payload + (legacy zero-arg locks). + """ + if self._payload is None: + return + self._payload = self._registry._update_payload( + self._repo_id, audit_pid=audit_pid, audit_pgid=audit_pgid + ) + def release(self) -> None: """Release the lock. Safe to call multiple times.""" if not self._released: @@ -68,49 +110,104 @@ def __repr__(self) -> str: class ManagedRepoAuditLockRegistry: - """Thread-safe in-memory lock registry. + """Thread-safe lock registry backed by a crash-safe ``PersistentLockStore``. - Maintains a set of repo_ids for which an audit is currently dispatched. - Acquiring a lock for a repo_id that is already held raises - RepoLockAlreadyHeldError immediately — no waiting or queueing. + At most one managed audit per ``repo_id`` may be dispatched at a time. + Acquiring a lock for a held ``repo_id`` raises ``RepoLockAlreadyHeldError``. """ - def __init__(self) -> None: + def __init__(self, state_dir: Path | None = None) -> None: self._mutex = threading.Lock() self._held: set[str] = set() + self._store = PersistentLockStore(state_dir or _DEFAULT_STATE_DIR) + self._swept = False - def acquire(self, repo_id: str) -> ManagedRepoAuditLock: - """Acquire the lock for repo_id. - - Returns a ManagedRepoAuditLock context manager. + @property + def store(self) -> PersistentLockStore: + return self._store - Raises - ------ - RepoLockAlreadyHeldError - If a managed audit is already dispatched for this repo_id. + def acquire( + self, + repo_id: str, + *, + run_id: str | None = None, + audit_type: str | None = None, + oc_pid: int | None = None, + command: str = "", + expected_run_status_path: str = "", + ) -> ManagedRepoAuditLock: + """Acquire the lock for ``repo_id`` and persist a lock payload. + + Identity parameters are accepted; legacy zero-arg callers receive a + synthesized payload (run_id="legacy", current OC PID). """ + self._sweep_once() with self._mutex: if repo_id in self._held: raise RepoLockAlreadyHeldError( - f"A managed audit is already running for repo '{repo_id}'. " - "Only one audit per repo is permitted at a time. " - "Wait for the in-progress audit to complete before dispatching another." + f"A managed audit is already running for repo '{repo_id}' " + "in this OpsCenter process. Only one audit per repo is " + "permitted at a time. Wait for the in-progress audit to " + "complete before dispatching another." ) + payload = PersistentLockPayload( + repo_id=repo_id, + run_id=run_id or "legacy", + audit_type=audit_type or "unknown", + oc_pid=oc_pid or os.getpid(), + started_at=_now_iso(), + command=command, + expected_run_status_path=expected_run_status_path, + ) + # try_acquire raises RepoLockAlreadyHeldError if disk lock is held. + self._store.try_acquire(payload) self._held.add(repo_id) - return ManagedRepoAuditLock(repo_id, self) + return ManagedRepoAuditLock(repo_id, self, payload=payload) def _release(self, repo_id: str) -> None: with self._mutex: self._held.discard(repo_id) + try: + self._store.release(repo_id) + except Exception: + # Release must never raise — best-effort cleanup. + pass + + def _update_payload( + self, + repo_id: str, + *, + audit_pid: int | None = None, + audit_pgid: int | None = None, + ) -> PersistentLockPayload: + return self._store.update( + repo_id, audit_pid=audit_pid, audit_pgid=audit_pgid + ) + + def _sweep_once(self) -> None: + """Reclaim stale locks once on first use of this registry.""" + if self._swept: + return + try: + self._store.sweep_stale() + finally: + self._swept = True def is_held(self, repo_id: str) -> bool: - """Return True if an audit is currently dispatched for repo_id.""" + """True iff a live persistent lock exists for ``repo_id``.""" with self._mutex: - return repo_id in self._held + if repo_id in self._held: + return True + # Allow another OpsCenter process holding the disk lock to count as held. + try: + existing = self._store.read(repo_id) + except Exception: + return False + return existing is not None and existing.is_alive() @property def held_repos(self) -> frozenset[str]: - """Snapshot of all currently locked repo_ids.""" + """Snapshot of all locked repo_ids in this OpsCenter process.""" with self._mutex: return frozenset(self._held) @@ -119,14 +216,36 @@ def held_repos(self) -> frozenset[str]: _GLOBAL_REGISTRY = ManagedRepoAuditLockRegistry() -def acquire_audit_lock(repo_id: str) -> ManagedRepoAuditLock: +def acquire_audit_lock( + repo_id: str, + *, + run_id: str | None = None, + audit_type: str | None = None, + oc_pid: int | None = None, + command: str = "", + expected_run_status_path: str = "", +) -> ManagedRepoAuditLock: """Acquire the global per-repo audit lock. - Raises RepoLockAlreadyHeldError if the repo already has an audit dispatched. + Raises ``RepoLockAlreadyHeldError`` if the repo already has an audit + dispatched (in this process or in another OpsCenter process holding the + persistent disk lock). """ - return _GLOBAL_REGISTRY.acquire(repo_id) + return _GLOBAL_REGISTRY.acquire( + repo_id, + run_id=run_id, + audit_type=audit_type, + oc_pid=oc_pid, + command=command, + expected_run_status_path=expected_run_status_path, + ) def is_audit_locked(repo_id: str) -> bool: - """Return True if a managed audit is currently dispatched for repo_id.""" + """True if a managed audit is currently dispatched for ``repo_id``.""" return _GLOBAL_REGISTRY.is_held(repo_id) + + +def get_global_registry() -> ManagedRepoAuditLockRegistry: + """Return the process-scoped global registry (for CLI / introspection).""" + return _GLOBAL_REGISTRY diff --git a/src/operations_center/audit_dispatch/watcher.py b/src/operations_center/audit_dispatch/watcher.py new file mode 100644 index 00000000..13a8fa0c --- /dev/null +++ b/src/operations_center/audit_dispatch/watcher.py @@ -0,0 +1,104 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 Velascat +"""In-flight ``run_status.json`` polling for managed audit dispatch (Phase 6, Slice F). + +Used by the ``operations-center-audit watch`` CLI to observe an in-progress +managed audit's lifecycle in real time. Polling-based — no ``watchdog`` +dependency added to the runtime. + +The audit's run_status.json lives at ``/run_status.json`` where +the bucket directory's name encodes the ``run_id`` as a suffix (matching VF's +report-naming convention). This module finds the bucket by scanning the +expected output dir for a child directory whose name contains ``run_id``. +""" + +from __future__ import annotations + +import json +import time +from collections.abc import Iterator +from dataclasses import dataclass +from pathlib import Path + +from operations_center.audit_contracts.vocabulary import RunStatus + + +_TERMINAL_STATUSES = { + RunStatus.COMPLETED.value, + RunStatus.FAILED.value, + RunStatus.INTERRUPTED.value, +} + + +@dataclass(frozen=True, slots=True) +class RunStatusSnapshot: + """One observed transition of ``run_status.json`` content.""" + + path: Path + status: str + current_phase: str | None + raw: dict + is_terminal: bool + + +def _find_bucket_dir(expected_output_dir: Path, run_id: str) -> Path | None: + """Locate the per-run bucket directory whose name contains ``run_id``.""" + if not expected_output_dir.is_dir(): + return None + for child in sorted(expected_output_dir.iterdir(), reverse=True): + if child.is_dir() and run_id in child.name: + return child + return None + + +def _read_status(path: Path) -> dict | None: + if not path.exists(): + return None + try: + return json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + + +def poll_run_status( + expected_output_dir: Path, + run_id: str, + *, + poll_interval_s: float = 2.0, + timeout_s: float | None = None, +) -> Iterator[RunStatusSnapshot]: + """Yield each time the bucket's ``run_status.json`` content changes. + + Stops yielding when the most-recent snapshot's ``status`` is terminal + (``completed``, ``failed``, ``interrupted``) or when ``timeout_s`` elapses. + + Yields nothing if the bucket dir never appears within the timeout. + """ + deadline = time.monotonic() + timeout_s if timeout_s is not None else None + last_payload: dict | None = None + + while True: + bucket = _find_bucket_dir(expected_output_dir, run_id) + if bucket is not None: + status_path = bucket / "run_status.json" + payload = _read_status(status_path) + if payload is not None and payload != last_payload: + last_payload = payload + status = str(payload.get("status", "unknown")) + snapshot = RunStatusSnapshot( + path=status_path, + status=status, + current_phase=payload.get("current_phase"), + raw=payload, + is_terminal=status in _TERMINAL_STATUSES, + ) + yield snapshot + if snapshot.is_terminal: + return + + if deadline is not None and time.monotonic() >= deadline: + return + time.sleep(poll_interval_s) + + +__all__ = ["RunStatusSnapshot", "poll_run_status"] diff --git a/src/operations_center/entrypoints/audit/main.py b/src/operations_center/entrypoints/audit/main.py index 18f63fd4..888b6e70 100644 --- a/src/operations_center/entrypoints/audit/main.py +++ b/src/operations_center/entrypoints/audit/main.py @@ -39,6 +39,7 @@ RepoLockAlreadyHeldError, dispatch_managed_audit, ) +from operations_center.audit_dispatch.locks import get_global_registry from operations_center.audit_toolset import ( ArtifactManifestPathMissingError, RunStatusContractError, @@ -159,6 +160,177 @@ def cmd_resolve_manifest( typer.echo(str(manifest_path)) +@app.command("dispatch") +def cmd_dispatch( + repo_id: str = typer.Argument(..., help="Managed repo ID."), + audit_type: str = typer.Argument(..., help="Audit type."), + allow_unverified: bool = typer.Option(False, "--allow-unverified"), + timeout: float | None = typer.Option(None, "--timeout"), + requested_by: str | None = typer.Option(None, "--requested-by"), + log_dir: str | None = typer.Option(None, "--log-dir"), + json_output: bool = typer.Option(False, "--json"), +) -> None: + """Positional alias for ``run`` (matches the Phase 6 spec invocation form).""" + cmd_run( + repo=repo_id, + audit_type=audit_type, + allow_unverified=allow_unverified, + timeout=timeout, + requested_by=requested_by, + log_dir=log_dir, + json_output=json_output, + ) + + +@app.command("list-active") +def cmd_list_active( + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """List currently-held audit dispatch locks across all OpsCenter processes.""" + import json as _json + from datetime import UTC, datetime + + store = get_global_registry().store + active = store.list_active() + + if json_output: + typer.echo( + _json.dumps( + [ + { + **p.to_json(), + **p.liveness_summary(), + } + for p in active + ], + indent=2, + ) + ) + return + + if not active: + console.print("[dim]no active audit locks[/dim]") + return + + t = Table(title="Active Audit Locks") + t.add_column("repo_id") + t.add_column("audit_type") + t.add_column("run_id") + t.add_column("oc_pid") + t.add_column("audit_pid") + t.add_column("liveness") + t.add_column("started_at") + t.add_column("age") + t.add_column("expected_output_dir") + now = datetime.now(UTC) + for p in active: + liveness = p.liveness_summary() + live_str = ( + f"oc={'✓' if liveness['oc_pid_alive'] else '✗'} " + f"audit={'✓' if liveness['audit_pid_alive'] else '✗'}" + ) + try: + started = datetime.fromisoformat(p.started_at.replace("Z", "+00:00")) + age = f"{(now - started).total_seconds():.0f}s" + except ValueError: + age = "?" + t.add_row( + p.repo_id, + p.audit_type, + p.run_id, + str(p.oc_pid), + str(p.audit_pid) if p.audit_pid is not None else "—", + live_str, + p.started_at, + age, + p.expected_run_status_path, + ) + console.print(t) + + +@app.command("watch") +def cmd_watch( + repo: str = typer.Option(..., "--repo", "-r", help="Managed repo ID."), + poll_interval: float = typer.Option(2.0, "--interval", help="Poll interval in seconds."), + timeout: float | None = typer.Option( + None, "--timeout", help="Stop watching after this many seconds." + ), +) -> None: + """Stream run_status.json transitions for the audit currently held by ``repo``. + + Reads the held lock to discover ``expected_run_status_path`` (the parent + output dir) and ``run_id``, then polls the bucket for status changes. + """ + from pathlib import Path as _Path + + from operations_center.audit_dispatch.watcher import poll_run_status + + store = get_global_registry().store + payload = store.read(repo) + if payload is None: + console.print(f"[yellow]No audit lock held for {repo!r}.[/yellow]") + raise typer.Exit(code=1) + + output_dir = _Path(payload.expected_run_status_path) + console.print( + f"[dim]watching run_id={payload.run_id} under {output_dir}[/dim]" + ) + for snapshot in poll_run_status( + output_dir, + payload.run_id, + poll_interval_s=poll_interval, + timeout_s=timeout, + ): + console.print( + f"[bold]{snapshot.status}[/bold] " + f"phase={snapshot.current_phase or '—'} " + f"path={snapshot.path}" + ) + if snapshot.is_terminal: + break + + +@app.command("unlock") +def cmd_unlock( + repo: str = typer.Option(..., "--repo", "-r", help="Managed repo ID to unlock."), + force: bool = typer.Option( + False, + "--force", + help="Force-release even if a recorded PID is still alive.", + ), +) -> None: + """Release a held audit dispatch lock for ``repo``. + + Without ``--force``, the lock is only released if all recorded PIDs are + dead (i.e., the lock is genuinely stale). With ``--force``, the lock is + released regardless — use only when an operator has confirmed the held + audit is not running. + """ + store = get_global_registry().store + payload = store.read(repo) + if payload is None: + console.print(f"[yellow]No lock held for repo {repo!r}.[/yellow]") + raise typer.Exit(code=0) + + if force: + store.release(repo) + console.print(f"[green]Force-released lock for {repo}.[/green]") + raise typer.Exit(code=0) + + if payload.is_alive(): + liveness = payload.liveness_summary() + console.print( + f"[red]Lock for {repo!r} is still alive[/red] " + f"(oc_pid={payload.oc_pid} alive={liveness['oc_pid_alive']}, " + f"audit_pid={payload.audit_pid} alive={liveness['audit_pid_alive']}). " + "Re-run with --force to override." + ) + raise typer.Exit(code=1) + + store.release(repo) + console.print(f"[green]Released stale lock for {repo}.[/green]") + + def _print_dispatch_result(result) -> None: status_style = "green" if result.succeeded else "red" t = Table(title=f"Dispatch Result — {result.repo_id}/{result.audit_type}") diff --git a/tests/unit/audit_dispatch/test_lock_store.py b/tests/unit/audit_dispatch/test_lock_store.py new file mode 100644 index 00000000..9b7310e9 --- /dev/null +++ b/tests/unit/audit_dispatch/test_lock_store.py @@ -0,0 +1,217 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 Velascat +"""Tests for the persistent lock store (Phase 6, Slice A).""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +from operations_center.audit_dispatch.errors import ( + LockStoreCorruptError, + RepoLockAlreadyHeldError, +) +from operations_center.audit_dispatch.lock_store import ( + LOCK_SCHEMA_VERSION, + PersistentLockPayload, + PersistentLockStore, +) + + +def _payload( + *, + repo_id: str = "videofoundry", + run_id: str = "videofoundry_representative_20260504T120000Z_aabb1122", + audit_type: str = "representative", + oc_pid: int | None = None, + audit_pid: int | None = None, + audit_pgid: int | None = None, +) -> PersistentLockPayload: + return PersistentLockPayload( + repo_id=repo_id, + run_id=run_id, + audit_type=audit_type, + oc_pid=oc_pid if oc_pid is not None else os.getpid(), + audit_pid=audit_pid, + audit_pgid=audit_pgid, + started_at="2026-05-04T12:00:00Z", + command="python -m tools.audit.run_representative_audit", + expected_run_status_path="/tmp/run_status.json", + ) + + +def _spawn_short_subprocess() -> subprocess.Popen: + """Spawn a Python child that exits quickly. Used to obtain a known PID.""" + return subprocess.Popen([sys.executable, "-c", "pass"]) + + +class TestAcquireRelease: + def test_acquire_writes_lock_file(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload()) + assert (tmp_path / "videofoundry.lock").exists() + + def test_lock_file_contains_payload_fields(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload(audit_pid=12345, audit_pgid=12345)) + data = json.loads((tmp_path / "videofoundry.lock").read_text()) + assert data["repo_id"] == "videofoundry" + assert data["audit_type"] == "representative" + assert data["audit_pid"] == 12345 + assert data["audit_pgid"] == 12345 + assert data["lock_schema_version"] == LOCK_SCHEMA_VERSION + + def test_release_removes_file(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload()) + assert store.release("videofoundry") is True + assert not (tmp_path / "videofoundry.lock").exists() + + def test_release_idempotent(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + assert store.release("videofoundry") is False # never held + + def test_double_acquire_raises_when_oc_pid_alive(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload()) # oc_pid = current process — alive + with pytest.raises(RepoLockAlreadyHeldError) as ei: + store.try_acquire(_payload()) + # Held payload is attached for diagnostics. + assert ei.value.held_payload.repo_id == "videofoundry" # type: ignore[attr-defined] + + def test_acquire_succeeds_when_held_lock_is_dead(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + # Spawn + reap a child; its PID is now guaranteed dead. + proc = _spawn_short_subprocess() + proc.wait() + dead_pid = proc.pid + # Manually plant a lock with a dead PID — bypass try_acquire's liveness check. + store._write_atomic( + tmp_path / "videofoundry.lock", + _payload(oc_pid=dead_pid), + ) + # Now acquiring should succeed because the held lock's only PID is dead. + store.try_acquire(_payload()) + assert (tmp_path / "videofoundry.lock").exists() + + def test_acquire_blocked_when_audit_pid_alive_even_if_oc_dead(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + proc = _spawn_short_subprocess() + proc.wait() + dead_pid = proc.pid + # OC PID dead but audit subprocess alive (= current process). + store._write_atomic( + tmp_path / "videofoundry.lock", + _payload(oc_pid=dead_pid, audit_pid=os.getpid()), + ) + with pytest.raises(RepoLockAlreadyHeldError): + store.try_acquire(_payload()) + + +class TestUpdate: + def test_update_audit_pid_atomically(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload()) + updated = store.update("videofoundry", audit_pid=4242, audit_pgid=4242) + assert updated.audit_pid == 4242 + assert updated.audit_pgid == 4242 + on_disk = store.read("videofoundry") + assert on_disk is not None and on_disk.audit_pid == 4242 + + def test_update_missing_lock_raises(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + with pytest.raises(FileNotFoundError): + store.update("videofoundry", audit_pid=99) + + +class TestStaleReclaim: + def test_reclaim_if_stale_when_pids_dead(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + proc = _spawn_short_subprocess() + proc.wait() + store._write_atomic( + tmp_path / "videofoundry.lock", + _payload(oc_pid=proc.pid), + ) + assert store.reclaim_if_stale("videofoundry") is True + assert not (tmp_path / "videofoundry.lock").exists() + + def test_reclaim_returns_false_when_alive(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload()) # oc_pid = current = alive + assert store.reclaim_if_stale("videofoundry") is False + assert (tmp_path / "videofoundry.lock").exists() + + def test_reclaim_corrupt_lock_treated_as_stale(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + (tmp_path / "videofoundry.lock").write_text("{not valid json") + assert store.reclaim_if_stale("videofoundry") is True + + def test_sweep_stale_returns_reclaimed_repos(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + proc = _spawn_short_subprocess() + proc.wait() + store._write_atomic(tmp_path / "repo_a.lock", _payload(repo_id="repo_a", oc_pid=proc.pid)) + store._write_atomic(tmp_path / "repo_b.lock", _payload(repo_id="repo_b", oc_pid=os.getpid())) + reclaimed = store.sweep_stale() + assert reclaimed == ["repo_a"] + assert (tmp_path / "repo_b.lock").exists() + + +class TestRead: + def test_read_returns_none_when_missing(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + assert store.read("nonexistent") is None + + def test_read_corrupt_raises(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + (tmp_path / "x.lock").write_text("{not valid json") + with pytest.raises(LockStoreCorruptError): + store.read("x") + + def test_read_missing_field_raises(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + (tmp_path / "x.lock").write_text(json.dumps({"repo_id": "x"})) + with pytest.raises(LockStoreCorruptError): + store.read("x") + + def test_list_active_returns_all_locks(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload(repo_id="repo_a")) + store.try_acquire(_payload(repo_id="repo_b")) + active = store.list_active() + assert {p.repo_id for p in active} == {"repo_a", "repo_b"} + + def test_list_active_skips_corrupt(self, tmp_path: Path) -> None: + store = PersistentLockStore(tmp_path) + store.try_acquire(_payload(repo_id="repo_a")) + (tmp_path / "broken.lock").write_text("not json") + active = store.list_active() + assert {p.repo_id for p in active} == {"repo_a"} + + +class TestPayloadLiveness: + def test_is_alive_when_oc_pid_alive(self) -> None: + p = _payload(oc_pid=os.getpid(), audit_pid=None) + assert p.is_alive() is True + + def test_is_alive_when_audit_pid_alive(self) -> None: + p = _payload(oc_pid=99999999, audit_pid=os.getpid()) + assert p.is_alive() is True + + def test_is_dead_when_both_pids_dead(self) -> None: + proc = _spawn_short_subprocess() + proc.wait() + p = _payload(oc_pid=proc.pid, audit_pid=None) + assert p.is_alive() is False + + def test_liveness_summary_shape(self) -> None: + p = _payload(oc_pid=os.getpid(), audit_pid=None) + summary = p.liveness_summary() + assert summary["oc_pid_alive"] is True + assert summary["audit_pid_alive"] is False diff --git a/tests/unit/audit_dispatch/test_lock_store_concurrency.py b/tests/unit/audit_dispatch/test_lock_store_concurrency.py new file mode 100644 index 00000000..173924b1 --- /dev/null +++ b/tests/unit/audit_dispatch/test_lock_store_concurrency.py @@ -0,0 +1,132 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 Velascat +"""Cross-process concurrency proof for the persistent lock store (Phase 6, Slice E). + +Spawns two real Python subprocesses competing for the same repo lock and +asserts that exactly one acquires it. This is the integration test for +``fcntl.flock``-based mutual exclusion across OS processes. +""" + +from __future__ import annotations + +import json +import subprocess +import sys +import textwrap +from pathlib import Path + + +_SUBPROCESS_SCRIPT = textwrap.dedent( + """ + import json + import os + import socket + import sys + import time + from pathlib import Path + + state_dir = Path(sys.argv[1]) + repo_id = sys.argv[2] + hold_seconds = float(sys.argv[3]) + out_path = Path(sys.argv[4]) + + sys.path.insert(0, sys.argv[5]) # OC src/ + + from operations_center.audit_dispatch.errors import RepoLockAlreadyHeldError + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + store = PersistentLockStore(state_dir) + payload = PersistentLockPayload( + repo_id=repo_id, + run_id=f"run_{os.getpid()}", + audit_type="representative", + oc_pid=os.getpid(), + started_at="2026-05-04T00:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ) + result = {"pid": os.getpid(), "acquired": False, "error": None} + try: + store.try_acquire(payload) + result["acquired"] = True + time.sleep(hold_seconds) + store.release(repo_id) + except RepoLockAlreadyHeldError as exc: + result["error"] = str(exc) + out_path.write_text(json.dumps(result)) + """ +) + + +def _run_competitor( + state_dir: Path, + repo_id: str, + hold_seconds: float, + out_path: Path, + oc_src: Path, +) -> subprocess.Popen: + return subprocess.Popen( + [ + sys.executable, + "-c", + _SUBPROCESS_SCRIPT, + str(state_dir), + repo_id, + str(hold_seconds), + str(out_path), + str(oc_src), + ], + ) + + +def _oc_src_dir() -> Path: + # tests/unit/audit_dispatch/test_lock_store_concurrency.py + # parents[0]=audit_dispatch [1]=unit [2]=tests [3]=OC root + return Path(__file__).resolve().parents[3] / "src" + + +class TestCrossProcessConcurrency: + def test_only_one_subprocess_acquires(self, tmp_path: Path) -> None: + out_a = tmp_path / "a.json" + out_b = tmp_path / "b.json" + oc_src = _oc_src_dir() + + # First holds the lock for 1.5s; second tries shortly after. + proc_a = _run_competitor(tmp_path, "videofoundry", 1.5, out_a, oc_src) + # Brief spin to ensure A wins the race. + import time as _t + _t.sleep(0.3) + proc_b = _run_competitor(tmp_path, "videofoundry", 0.1, out_b, oc_src) + + proc_a.wait(timeout=10) + proc_b.wait(timeout=10) + + result_a = json.loads(out_a.read_text()) + result_b = json.loads(out_b.read_text()) + + # Exactly one acquires. + assert result_a["acquired"] != result_b["acquired"] + # The one that didn't acquire reports a meaningful error. + loser = result_b if result_a["acquired"] else result_a + assert loser["error"] is not None + assert "videofoundry" in loser["error"] + + def test_sequential_acquires_succeed(self, tmp_path: Path) -> None: + """After A releases, B acquires successfully — no leftover lock files.""" + out_a = tmp_path / "a.json" + out_b = tmp_path / "b.json" + oc_src = _oc_src_dir() + + proc_a = _run_competitor(tmp_path, "videofoundry", 0.1, out_a, oc_src) + proc_a.wait(timeout=10) + # A has fully released by now. + proc_b = _run_competitor(tmp_path, "videofoundry", 0.1, out_b, oc_src) + proc_b.wait(timeout=10) + + result_a = json.loads(out_a.read_text()) + result_b = json.loads(out_b.read_text()) + assert result_a["acquired"] is True + assert result_b["acquired"] is True diff --git a/tests/unit/audit_dispatch/test_locks.py b/tests/unit/audit_dispatch/test_locks.py index 5f0e5593..6ea52609 100644 --- a/tests/unit/audit_dispatch/test_locks.py +++ b/tests/unit/audit_dispatch/test_locks.py @@ -8,6 +8,8 @@ import pytest +import tempfile + from operations_center.audit_dispatch.errors import RepoLockAlreadyHeldError from operations_center.audit_dispatch.locks import ( ManagedRepoAuditLock, @@ -16,7 +18,9 @@ def _registry() -> ManagedRepoAuditLockRegistry: - return ManagedRepoAuditLockRegistry() + """Build a registry with an isolated temp state_dir so tests don't pollute + the real OpsCenter state/audit_dispatch/locks/ directory.""" + return ManagedRepoAuditLockRegistry(state_dir=tempfile.mkdtemp()) class TestLockAcquire: @@ -116,6 +120,111 @@ def test_held_repos_is_snapshot_not_live(self) -> None: assert "videofoundry" in snapshot +class TestLazyStaleSweep: + """Slice C: registry reclaims stale locks on first use (post-OC-restart safety).""" + + def test_sweep_clears_dead_lock_before_first_acquire(self, tmp_path) -> None: + import os + import subprocess + import sys + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + # Plant a stale lock from a "previous OC process" using a real-then-dead PID. + proc = subprocess.Popen([sys.executable, "-c", "pass"]) + proc.wait() + store = PersistentLockStore(tmp_path) + store._write_atomic( + tmp_path / "videofoundry.lock", + PersistentLockPayload( + repo_id="videofoundry", + run_id="leftover", + audit_type="representative", + oc_pid=proc.pid, + started_at="2026-05-04T00:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ), + ) + + # Fresh registry (simulates OC restart) — first acquire sweeps + succeeds. + reg = ManagedRepoAuditLockRegistry(state_dir=tmp_path) + lock = reg.acquire("videofoundry", run_id="new") + assert lock.payload is not None + assert lock.payload.run_id == "new" + lock.release() + + def test_sweep_only_runs_once_per_registry(self, tmp_path) -> None: + reg = ManagedRepoAuditLockRegistry(state_dir=tmp_path) + # Trigger sweep + lock1 = reg.acquire("repo_a") + lock1.release() + # Plant a stale lock after the registry has already swept once + import os + import subprocess + import sys + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + proc = subprocess.Popen([sys.executable, "-c", "pass"]) + proc.wait() + store = PersistentLockStore(tmp_path) + store._write_atomic( + tmp_path / "videofoundry.lock", + PersistentLockPayload( + repo_id="videofoundry", + run_id="leftover", + audit_type="representative", + oc_pid=proc.pid, + started_at="2026-05-04T00:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ), + ) + # The lazy sweep won't run again — but try_acquire will still detect + # the stale lock at acquire time because is_alive() returns False. + lock2 = reg.acquire("videofoundry", run_id="new") + lock2.release() + + +class TestIdentityWiring: + """Slice B: lock payload carries run_id / audit_type / pids.""" + + def test_acquire_payload_records_identity(self, tmp_path) -> None: + import os + + reg = ManagedRepoAuditLockRegistry(state_dir=tmp_path) + lock = reg.acquire( + "videofoundry", + run_id="vid_rep_xyz", + audit_type="representative", + command="python -m foo", + expected_run_status_path="/tmp/x", + ) + assert lock.payload is not None + assert lock.payload.run_id == "vid_rep_xyz" + assert lock.payload.audit_type == "representative" + assert lock.payload.oc_pid == os.getpid() + assert lock.payload.audit_pid is None + lock.release() + + def test_update_audit_pid_persists(self, tmp_path) -> None: + reg = ManagedRepoAuditLockRegistry(state_dir=tmp_path) + lock = reg.acquire("videofoundry", run_id="r1") + lock.update_audit_pid(audit_pid=4242, audit_pgid=4242) + assert lock.payload is not None + assert lock.payload.audit_pid == 4242 + on_disk = reg.store.read("videofoundry") + assert on_disk is not None and on_disk.audit_pid == 4242 + lock.release() + + class TestThreadSafety: def test_concurrent_acquires_for_same_repo_only_one_succeeds(self) -> None: reg = _registry() diff --git a/tests/unit/audit_dispatch/test_watcher.py b/tests/unit/audit_dispatch/test_watcher.py new file mode 100644 index 00000000..a4ce74dc --- /dev/null +++ b/tests/unit/audit_dispatch/test_watcher.py @@ -0,0 +1,106 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 Velascat +"""Tests for the in-flight run_status watcher (Phase 6, Slice F).""" + +from __future__ import annotations + +import json +import threading +import time +from pathlib import Path + +from operations_center.audit_dispatch.watcher import ( + RunStatusSnapshot, + poll_run_status, +) + + +def _write_status(path: Path, status: str, current_phase: str = "running") -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + json.dumps( + { + "schema_version": "1.0", + "contract_name": "managed-repo-audit", + "producer": "videofoundry", + "repo_id": "videofoundry", + "run_id": "test_run_xyz", + "audit_type": "representative", + "status": status, + "current_phase": current_phase, + } + ), + encoding="utf-8", + ) + + +class TestPollRunStatus: + def test_yields_terminal_snapshot_when_already_completed(self, tmp_path: Path) -> None: + bucket = tmp_path / "bucket_test_run_xyz" + bucket.mkdir() + _write_status(bucket / "run_status.json", "completed", current_phase="completed") + + snapshots = list( + poll_run_status( + tmp_path, + "test_run_xyz", + poll_interval_s=0.05, + timeout_s=1.0, + ) + ) + assert len(snapshots) == 1 + assert snapshots[0].status == "completed" + assert snapshots[0].is_terminal is True + + def test_yields_each_distinct_state_change(self, tmp_path: Path) -> None: + bucket = tmp_path / "bucket_test_run_xyz" + bucket.mkdir() + status_path = bucket / "run_status.json" + _write_status(status_path, "running", current_phase="bootstrap") + + snapshots: list[RunStatusSnapshot] = [] + + def consumer() -> None: + for s in poll_run_status( + tmp_path, + "test_run_xyz", + poll_interval_s=0.05, + timeout_s=3.0, + ): + snapshots.append(s) + + t = threading.Thread(target=consumer) + t.start() + + time.sleep(0.2) + _write_status(status_path, "running", current_phase="rendering") + time.sleep(0.2) + _write_status(status_path, "completed", current_phase="completed") + t.join(timeout=5.0) + + statuses = [(s.status, s.current_phase) for s in snapshots] + # Initial running observed first; rendering phase change; final completed. + assert ("running", "bootstrap") in statuses + assert ("completed", "completed") in statuses + assert snapshots[-1].is_terminal is True + + def test_yields_nothing_when_bucket_never_appears(self, tmp_path: Path) -> None: + snapshots = list( + poll_run_status( + tmp_path, + "no_such_run", + poll_interval_s=0.05, + timeout_s=0.3, + ) + ) + assert snapshots == [] + + def test_finds_bucket_by_run_id_substring(self, tmp_path: Path) -> None: + # Bucket name follows VF convention: "__" + bucket = tmp_path / "Connective_Contours_20260504_120000_test_run_xyz" + bucket.mkdir() + _write_status(bucket / "run_status.json", "completed") + snapshots = list( + poll_run_status(tmp_path, "test_run_xyz", poll_interval_s=0.05, timeout_s=1.0) + ) + assert len(snapshots) == 1 diff --git a/tests/unit/cli/test_audit_cli.py b/tests/unit/cli/test_audit_cli.py index 03ea313d..51d3e9cb 100644 --- a/tests/unit/cli/test_audit_cli.py +++ b/tests/unit/cli/test_audit_cli.py @@ -148,3 +148,193 @@ def test_resolve_manifest_not_found_exits_code_1(self, tmp_path: Path): out = _runner.invoke(app, ["resolve-manifest", str(tmp_path / "missing.json")]) assert out.exit_code == 1 assert "Not found" in out.output + + +# --------------------------------------------------------------------------- +# cmd_list_active + cmd_unlock + cmd_dispatch (Slice D) +# --------------------------------------------------------------------------- + + +class TestCmdListActive: + def test_list_active_empty(self, tmp_path: Path, monkeypatch): + from operations_center.audit_dispatch.lock_store import PersistentLockStore + + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": PersistentLockStore(tmp_path)})(), + ) + out = _runner.invoke(app, ["list-active"]) + assert out.exit_code == 0 + assert "no active audit locks" in out.output + + def test_list_active_renders_table(self, tmp_path: Path, monkeypatch): + import os + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + store = PersistentLockStore(tmp_path) + store.try_acquire( + PersistentLockPayload( + repo_id="videofoundry", + run_id="vid_rep_xyz", + audit_type="representative", + oc_pid=os.getpid(), + started_at="2026-05-04T12:00:00Z", + command="python -m foo", + expected_run_status_path="/tmp/output", + ) + ) + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": store})(), + ) + out = _runner.invoke(app, ["list-active"]) + assert out.exit_code == 0 + # Rich may truncate long strings in narrow terminals; assert the table renders. + assert "Active Audit Locks" in out.output + + def test_list_active_json(self, tmp_path: Path, monkeypatch): + import os + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + store = PersistentLockStore(tmp_path) + store.try_acquire( + PersistentLockPayload( + repo_id="videofoundry", + run_id="r1", + audit_type="representative", + oc_pid=os.getpid(), + started_at="2026-05-04T12:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ) + ) + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": store})(), + ) + out = _runner.invoke(app, ["list-active", "--json"]) + assert out.exit_code == 0 + rows = json.loads(out.output) + assert rows[0]["repo_id"] == "videofoundry" + assert rows[0]["oc_pid_alive"] is True + + +class TestCmdUnlock: + def test_unlock_no_lock(self, tmp_path: Path, monkeypatch): + from operations_center.audit_dispatch.lock_store import PersistentLockStore + + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": PersistentLockStore(tmp_path)})(), + ) + out = _runner.invoke(app, ["unlock", "--repo", "videofoundry"]) + assert out.exit_code == 0 + assert "No lock held" in out.output + + def test_unlock_refuses_alive(self, tmp_path: Path, monkeypatch): + import os + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + store = PersistentLockStore(tmp_path) + store.try_acquire( + PersistentLockPayload( + repo_id="videofoundry", + run_id="r1", + audit_type="representative", + oc_pid=os.getpid(), # alive + started_at="2026-05-04T12:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ) + ) + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": store})(), + ) + out = _runner.invoke(app, ["unlock", "--repo", "videofoundry"]) + assert out.exit_code == 1 + assert "still alive" in out.output + # Lock still exists. + assert store.read("videofoundry") is not None + + def test_unlock_force_releases_alive(self, tmp_path: Path, monkeypatch): + import os + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + store = PersistentLockStore(tmp_path) + store.try_acquire( + PersistentLockPayload( + repo_id="videofoundry", + run_id="r1", + audit_type="representative", + oc_pid=os.getpid(), + started_at="2026-05-04T12:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ) + ) + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": store})(), + ) + out = _runner.invoke(app, ["unlock", "--repo", "videofoundry", "--force"]) + assert out.exit_code == 0 + assert "Force-released" in out.output + assert store.read("videofoundry") is None + + def test_unlock_releases_stale(self, tmp_path: Path, monkeypatch): + import subprocess + import sys + + from operations_center.audit_dispatch.lock_store import ( + PersistentLockPayload, + PersistentLockStore, + ) + + proc = subprocess.Popen([sys.executable, "-c", "pass"]) + proc.wait() + store = PersistentLockStore(tmp_path) + store._write_atomic( + tmp_path / "videofoundry.lock", + PersistentLockPayload( + repo_id="videofoundry", + run_id="r1", + audit_type="representative", + oc_pid=proc.pid, # dead + started_at="2026-05-04T12:00:00Z", + command="x", + expected_run_status_path="/tmp/x", + ), + ) + monkeypatch.setattr( + "operations_center.entrypoints.audit.main.get_global_registry", + lambda: type("R", (), {"store": store})(), + ) + out = _runner.invoke(app, ["unlock", "--repo", "videofoundry"]) + assert out.exit_code == 0 + assert "Released stale" in out.output + assert store.read("videofoundry") is None + + +class TestCmdDispatch: + def test_dispatch_alias_invokes_run(self, tmp_path: Path): + with patch(_DISPATCH_TARGET, return_value=_make_dispatch_result()): + out = _runner.invoke(app, ["dispatch", "videofoundry", "representative"]) + assert out.exit_code == 0 + assert "videofoundry" in out.output