Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .console/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ _(none — system locked at Rev 10)_

## Up Next

- [x] **Phase 6 — dispatch control crash-safety + dual-PID tracking (2026-05-04, branch phase6-dispatch-control)**: All 6 slices complete. New `audit_dispatch/lock_store.py` (PersistentLockStore + dual-PID payload, atomic writes, fcntl sentinel via audit_governance/file_locks); `locks.py` refactored as façade over the store with full-identity acquire signature; `executor.execute()` accepts `on_spawn(pid, pgid)` callback; `api.py` carries identity through; stale-PID reclaim + lazy first-use sweep; new CLI commands `list-active / unlock / dispatch / watch` on `operations-center-audit`; cross-process concurrency proof test; in-flight run_status watcher (polling, no watchdog dep). Sentinel-glob bug fixed (`_iter_lock_files` filters `.lock.lock` recursive sentinels). Tests: 64 new + all existing passing. Full unit suite 2041 pass.

- [ ] Phase 13 or next operator directive TBD

## Done
Expand Down
7 changes: 7 additions & 0 deletions .console/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
_Chronological continuity log. Decisions, stop points, what changed and why._
_Not a task tracker — that's backlog.md. Keep entries concise and dated._

## Stop Points

- Phase 6 — dispatch control crash-safety + dual-PID tracking (2026-05-04, branch `phase6-dispatch-control`): All 6 slices landed. (A) New `audit_dispatch/lock_store.py` with `PersistentLockStore` + `PersistentLockPayload`; atomic write-tempfile + `os.replace`; `fcntl.flock` sentinel via the existing `audit_governance/file_locks.locked_state_file` helper (lazy-imported to avoid `audit_governance` package-init cycle). Dual-PID payload baked in: `oc_pid` (supervisor) + `audit_pid` (subprocess) + `audit_pgid`. (B) `locks.py` refactored to delegate persistence to the store; `acquire_audit_lock(repo_id, *, run_id, audit_type, oc_pid, command, expected_run_status_path)` carries identity. `executor.execute()` accepts `on_spawn(pid, pgid)` callback wired to `lock.update_audit_pid` so the subprocess PID is patched into the lock immediately after Popen. `api.py` resolves the absolute expected output dir and threads it through. (C) Stale-reclaim policy: a lock is alive iff *any* recorded PID is alive (`os.kill(pid, 0)`). Fresh registry sweeps on first use to recover from OC crash. Corrupt lock files treated as stale (operator can `unlock --force`). (D) New CLI commands on `operations-center-audit`: `list-active` (Rich table or `--json`), `unlock --repo X [--force]`, `dispatch <repo> <type>` positional alias, `watch --repo X` (in-flight `run_status.json` polling). (E) `tests/unit/audit_dispatch/test_lock_store_concurrency.py` spawns two real subprocesses competing for the same repo lock; asserts exactly one acquires. (F) New `watcher.py` exposes `poll_run_status(expected_output_dir, run_id)` iterator yielding `RunStatusSnapshot` on each on-disk content change, terminating on `completed/failed/interrupted`; locates VF buckets by `run_id` substring match per the existing report-naming convention (no `watchdog` dep). Sentinel-glob bug caught in test: `_iter_lock_files` filters out `*.lock.lock`-style sentinels so sweep doesn't recursively wrap. Test counts: 22 lock_store + 18 locks + 18 audit-cli + 2 cross-process + 4 watcher tests; full unit suite 2041 pass (architecture_invariants pre-broken collection error pre-existing on main).

## Recent Decisions

| Decision | Rationale | Date |
|----------|-----------|------|
| Phase 6 dual-PID lock (oc_pid + audit_pid) | Single-PID design left orphaned audit subprocesses invisible to liveness checks (OC dies, audit lives → lock looks reclaimable but artifact writes still happening). Dual-PID treats lock as alive iff any recorded PID is alive — orphaned audit holds the lock until it exits, which is correct. | 2026-05-04 |
| Persistent lock at state/audit_dispatch/locks/{repo_id}.lock | Matches existing OC `state/{subsystem}/...` convention. JSON payload, atomic os.replace. fcntl.flock sentinel via existing audit_governance file_locks helper — no new dep, cross-process exclusion proven by test_lock_store_concurrency.py. | 2026-05-04 |
| Polling over watchdog for in-flight run_status observation | watchdog is not a current OC runtime dep. Polling at 2s default interval is adequate for human-observable lifecycle transitions (running→completed). No new runtime dep. | 2026-05-04 |
| C41 json.dumps ensure_ascii=False | 131 json.dumps calls across 43 files now include ensure_ascii=False; prevents silent Unicode escaping in logs and payloads | 2026-05-03 |
| T4 orphan fixtures deleted | Custodian T4 detector flagged default_proposal(), default_decision() (policy/conftest.py) and index_from_example_failed() (behavior_calibration/conftest.py) as never requested; all removed; 279 tests pass | 2026-05-02 |
| `artifact_manifest_path` is `Optional[str]` in model | VF doesn't write it yet; `is_compliant` enforces it without rejecting legacy files | 2026-04-26 |
Expand Down
23 changes: 22 additions & 1 deletion src/operations_center/audit_dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,23 @@
"""

from .api import dispatch_managed_audit
from .errors import AuditDispatchConfigError, AuditDispatchError, RepoLockAlreadyHeldError
from .errors import (
AuditDispatchConfigError,
AuditDispatchError,
LockStoreCorruptError,
RepoLockAlreadyHeldError,
StaleLockReclaimedWarning,
)
from .lock_store import (
LOCK_SCHEMA_VERSION,
PersistentLockPayload,
PersistentLockStore,
)
from .locks import (
ManagedRepoAuditLock,
ManagedRepoAuditLockRegistry,
acquire_audit_lock,
get_global_registry,
is_audit_locked,
)
from .models import (
Expand All @@ -40,18 +52,27 @@
ManagedAuditDispatchRequest,
ManagedAuditDispatchResult,
)
from .watcher import RunStatusSnapshot, poll_run_status

__all__ = [
"dispatch_managed_audit",
"AuditDispatchError",
"AuditDispatchConfigError",
"RepoLockAlreadyHeldError",
"LockStoreCorruptError",
"StaleLockReclaimedWarning",
"LOCK_SCHEMA_VERSION",
"PersistentLockPayload",
"PersistentLockStore",
"ManagedRepoAuditLock",
"ManagedRepoAuditLockRegistry",
"acquire_audit_lock",
"get_global_registry",
"is_audit_locked",
"DispatchStatus",
"FailureKind",
"ManagedAuditDispatchRequest",
"ManagedAuditDispatchResult",
"RunStatusSnapshot",
"poll_run_status",
]
18 changes: 16 additions & 2 deletions src/operations_center/audit_dispatch/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,26 @@ def dispatch_managed_audit(
)
executor = ManagedAuditExecutor(effective_log_dir)

# Step 3: Acquire per-repo lock — raises if already held.
lock = acquire_audit_lock(request.repo_id)
# Step 3: Acquire per-repo lock with full identity payload — raises if held.
# The actual run_status.json path is not known until VF creates the per-run
# bucket dir (named with run_id as a suffix). Persist the parent output dir
# so external observers can discover the bucket via the same logic as lifecycle.py.
expected_output_dir_abs = str(
(Path(working_dir_abs) / invocation.expected_output_dir).resolve()
)
lock = acquire_audit_lock(
request.repo_id,
run_id=run_id,
audit_type=request.audit_type,
oc_pid=os.getpid(),
command=invocation.command,
expected_run_status_path=expected_output_dir_abs,
)
try:
proc_result = executor.execute(
invocation,
timeout_seconds=request.timeout_seconds,
on_spawn=lock.update_audit_pid,
)
finally:
lock.release()
Expand Down
16 changes: 16 additions & 0 deletions src/operations_center/audit_dispatch/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ class AuditDispatchConfigError(AuditDispatchError):
These are not operational failures — they indicate the dispatch was
incorrectly configured and must be fixed before retrying.
"""


class LockStoreCorruptError(AuditDispatchError):
"""A persistent lock file exists but is malformed (bad JSON or schema).

Recoverable via ``operations-center-audit unlock --repo X --force`` after
operator inspection.
"""


class StaleLockReclaimedWarning(UserWarning):
"""Emitted when a stale persistent lock is reclaimed during normal dispatch.

A warning rather than an error — reclaim is the intended recovery path
after an OpsCenter crash.
"""
14 changes: 14 additions & 0 deletions src/operations_center/audit_dispatch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import shlex
import signal
import subprocess
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
Expand Down Expand Up @@ -65,6 +66,7 @@ def execute(
*,
timeout_seconds: float | None = None,
cwd_override: str | None = None,
on_spawn: Callable[[int, int | None], None] | None = None,
) -> ProcessResult:
"""Execute the managed audit command.

Expand Down Expand Up @@ -104,6 +106,18 @@ def execute(
open(stderr_path, "w", encoding="utf-8", errors="replace") as err_f,
):
proc = subprocess.Popen(args, stdout=out_f, stderr=err_f, **popen_kwargs)
if on_spawn is not None:
pgid: int | None = None
if platform.system() != "Windows":
try:
pgid = os.getpgid(proc.pid)
except (ProcessLookupError, OSError):
pgid = None
try:
on_spawn(proc.pid, pgid)
except Exception:
# on_spawn must never break dispatch — best-effort hook.
pass
try:
proc.wait(timeout=timeout_seconds)
except subprocess.TimeoutExpired:
Expand Down
Loading
Loading