diff --git a/anton/README.md b/anton/README.md index 74abc5fa..484e78b2 100644 --- a/anton/README.md +++ b/anton/README.md @@ -11,7 +11,7 @@ And here we are: Like an adrenaline junkie eyeing at a bungee looking for anothe It is probably obvious now, but Anton has a brain-inspired architecture, and the more we build it the more it resembles/mirrors functional parts of the brain. On the other hand we also understand that people don't need to know anything about the brain to play with Anton, so we mapped some of the places/files where users can have inputs, or investigate what's up, to names that make more sense than the scientific name of that function of the brain. -The current implementation has six blocks, mapping the major learning systems: +The current implementation has seven blocks, mapping the major learning systems: | Brain Region | Function | Anton Equivalent | |------------------------------|---------------------------------------------------|---------------------------------------------------------------| @@ -20,9 +20,10 @@ The current implementation has six blocks, mapping the major learning systems: | Hippocampus | Episodic memory, records experiences | Experience Store — logs of problem + context + solution | | Cortex (semantic memory) | Facts, rules, identity — the consolidated knowledge | Engrams — `lessons.md`, `rules.md`, `profile.md` | | Striatum (procedural memory) | Habits and learned procedures — patterns of action | Skills — multi-stage reusable procedures with declarative + chunked + code representations | -| Cerebellum (error learning) | Supervised correction — "what I expected vs what happened" | Cerebellum — buffers errored scratchpad cells, extracts generalizable lessons via post-mortem | +| Cerebellum (per-cell error learning) | Supervised correction on a single action — "what I expected vs what happened" | Cerebellum — buffers errored scratchpad cells, extracts generalizable lessons via post-mortem | +| Anterior Cingulate Cortex (turn-level error detection) | Notices when the same kind of error pattern fires more than once within an episode — the brain's ERN | ACC — observes turn events, flags repeat patterns, produces lessons that flow through the same Engram pipeline | -These six systems coexist the way they coexist in the brain: declarative and procedural memory are dissociable (a person with hippocampal damage like H.M. can lose new declarative memories but still learn motor skills), and the cerebellum operates in parallel with continued action rather than blocking it. +These seven systems coexist the way they coexist in the brain: declarative and procedural memory are dissociable (a person with hippocampal damage like H.M. can lose new declarative memories but still learn motor skills); the cerebellum operates in parallel with continued action rather than blocking it; and the ACC watches the whole turn rather than any single cell, complementing rather than replacing the cerebellum. @@ -134,7 +135,8 @@ And the Hipocampus also is controlled as follows: | **Medial Temporal Lobe** (episodic) | `episodes.py` | Raw episodic memory. Logs every conversation turn as timestamped JSONL — user input, assistant responses, tool calls, scratchpad output. Searchable via the `recall` tool. Like HSAM: never forgets. | | **Hippocampal Replay** (SWS consolidation) | `consolidator.py` | After a scratchpad session ends, replays what happened in compressed form and extracts durable lessons via a fast LLM call. Like sleep — offline, post-hoc, selective. | | **Striatum** (procedural memory) | `skills.py` | Long-term procedural memory. Stores reusable skills as multi-stage directories (declarative → chunks → code). The LLM retrieves skills on demand via the `recall_skill` tool, the way the basal ganglia activates a learned action sequence in response to a familiar context. | -| **Cerebellum** (supervised error learning) | `cerebellum.py` | Forward-model + error correction. Observes every scratchpad cell via pre/post execute hooks, buffers errored/warning cells across the turn, and runs a post-mortem LLM diff to extract generalizable lessons. Lessons flow through the existing wisdom-injection pipeline into future code generation. Operates in parallel with the agent — never blocks. | +| **Cerebellum** (supervised error learning) | `cerebellum.py` | Forward-model + error correction at the *single-cell* time scale. Observes every scratchpad cell via pre/post execute hooks, buffers errored/warning cells across the turn, and runs a post-mortem LLM diff to extract generalizable lessons. Lessons flow through the existing wisdom-injection pipeline into future code generation. Operates in parallel with the agent — never blocks. | +| **Anterior Cingulate Cortex** (ERN — turn-level error detection) | `acc.py` | Pattern-level error detection at the *whole-turn* time scale. Watches a stream of typed events (scratchpad calls/results, tool calls/results, history repairs, round milestones) and runs pure-function detectors at end-of-turn. Lessons it emits flow through the same `cortex.encode()` path the cerebellum uses; it does not own storage. Implemented as a standalone module with passing tests; not yet wired into `ChatSession`. | | **Reconsolidation** (Nader et al.) | `reconsolidator.py` | One-time migration. When old memory formats are reactivated, they enter a labile state and get re-encoded in the new format. Preserves content, updates structure. | | **Medial PFC / Default Mode Network** | `profile.md` | Always-on self-model. Identity facts (name, timezone, preferences) that contextualize all processing — you don't "look up" your own name. | | **Basal Ganglia + OFC** | `rules.md` | Go/No-Go behavioral gates. The direct pathway enables ("always"), the indirect pathway suppresses ("never"), the OFC handles conditions ("when X → do Y"). | @@ -587,6 +589,76 @@ Brain analog: cerebellar plasticity (LTD at parallel-fiber → Purkinje cell syn These appear in `lessons.md` like any other engram, carry the same metadata, and get pruned by the same compaction loop when memory grows past threshold. +## Anterior Cingulate Cortex — Pattern-Level Error Detection + +The Cerebellum learns from a single failed cell. The ACC learns from a *pattern across multiple events* within one turn. Brain analog: the anterior cingulate cortex fires the *error-related negativity* (ERN) ~80 ms after the brain notices that an actual outcome diverged from an expected one. That signal flows downstream to the dopaminergic midbrain (reward prediction error), the striatum (action policy update), and the dorsolateral PFC (strategy adjustment). + +Anton's ACC watches a turn unfold, classifies events as they arrive, and at end-of-turn extracts actionable lessons from patterns that fired more than once. Real failure modes it's designed to catch: + +- **Scratchpad name switch** — the LLM started in `build_pres`, switched to `write_html`, then `pres1`. Each scratchpad name is a separate isolated environment; variables in one don't exist in another. Burned 8 rounds on recovery in the original session. +- **Oversized cell drops** — large HTML strings serialised to empty `code` in the tool-call schema. Repeated >5KB cells fail the same way; the LLM didn't realise the schema was clipping it. +- **Repeated tool error** — the same tool failed three times in a row with the same args (the publish-from-chat bug — three identical failed calls before pivoting). + +### Status + +Implemented at `anton/core/memory/acc.py` with 51 passing tests, plus 14 wiring tests at `tests/test_session_acc_init.py`. Layers 1 and 2 are wired into `ChatSession`: + + - **Layer 1 — passive learning.** Emit sites fire `acc.observe(...)` at scratchpad/tool/repair/cap-exhaust hooks; `_schedule_acc_flush()` runs at end-of-turn alongside the cerebellum flush. Detected lessons become `Engram` objects whose `kind` (always/never/when) was tagged by the detector, so they flow into the right section of `rules.md`. Future turns pick them up via the existing memory→system-prompt pipeline. + - **Layer 2 — mid-turn nudges.** `at_round_n()` runs after each tool-call round; when a NEW detector fires (one nudge per detector per turn), the lesson text gets appended as a `text` block inside the same user-role message that carries the round's `tool_result` blocks. The LLM sees the alarm on its very next round, not on the next turn. Off by default — gated on `ANTON_ACC_MODE=active`. + +Modes (env var `ANTON_ACC_MODE`, mirrors `ANTON_MEMORY_MODE`): + +| Mode | Behaviour | +|---|---| +| `off` | ACC observes nothing — events drop at the safe-emit wrapper. Use to disable the feature entirely. | +| `passive` (default) | Layer 1 only. Lessons drain to memory at end-of-turn; next turn's system prompt picks them up. Adds zero surface to the turn loop. | +| `active` | Layer 1 + Layer 2. Lessons ALSO inject inline as text blocks in `tool_results` so the LLM sees them on the very next round. Stronger learning signal; more invasive. | + +What is NOT yet wired (deliberate): + - **Layer 3 — retrieval-scored rule ranking.** At system-prompt assembly, score each candidate rule by relevance to the current turn's context and load the top-K within the token budget. Per-rule retrieval counters age out rules that never make the cut. Pairs with optional outcome-tracking (did the rule reduce its target pattern after it landed?). Needs a small embedding index over rules + a ranker call on the load path. + +### Vocabulary discipline + +The ACC enforces a closed event vocabulary via the `EVENT_KINDS` frozenset. `observe()` raises `ValueError` on unknown kinds. A test asserts that every kind in `EVENT_KINDS` is read by at least one detector — the previous wide `KNOWN_PRODUCER_ONLY` allowlist was deliberately collapsed to a single justified entry (`tool_call`, reserved for a future `detect_orphaned_tool_call`). A second test guards against dropped kinds (`context_compaction`, `round_milestone`) silently reappearing. + +### Event vocabulary (9 kinds) + +| Kind | Detail shape | Read by | +|---|---|---| +| `scratchpad_call` | `{name, code_len, one_line_description}` | `detect_name_switch`, `detect_oversized_cell` | +| `scratchpad_result` | `{name, success, stdout_len, error}` | `detect_repeated_error_signature` | +| `scratchpad_empty_code` | `{name}` | `detect_oversized_cell` | +| `scratchpad_reset` | `{name, reason}` | `detect_reset_churn` | +| `scratchpad_killed` | `{name, reason}` | `detect_kill_loop` | +| `tool_call` | `{name, args_summary}` | *(producer-only — reserved for future `detect_orphaned_tool_call`)* | +| `tool_result` | `{name, success, error}` | `detect_repeated_tool_error`, `detect_repeated_error_signature` | +| `history_repair` | `{reason}` | `detect_repair_churn` | +| `cap_exhausted` | `{}` | `detect_cap_exhausted` | + +### Detectors (9 pure functions over the event stream) + +Detectors are stateless functions of `Sequence[Event] → Lesson | None`. Each detector that fires produces a one-sentence rule. Cross-detector dedupe at `at_end_of_turn()` collapses overlapping lessons. + +| Detector | Fires when | Cognitive failure it learns from | +|---|---|---| +| `detect_name_switch` | ≥2 distinct scratchpad names in one turn | Identity sprawl across scratchpads — each name is a separate venv. | +| `detect_oversized_cell` | Observed empty-code drops OR ≥2 cells over ~5 KB | Silent schema truncation of large `code` strings. | +| `detect_repeated_tool_error` | ≥2 consecutive failures of same tool | Blind retry of same tool. | +| `detect_repeated_error_signature` | Same normalised error signature ≥3 times across any producers | Blind retry across tools / arg tweaks (generalises the publish-from-chat + gmail bug patterns). | +| `detect_reset_churn` | ≥2 scratchpad resets in one turn | Abandoning state instead of debugging in place. | +| `detect_kill_loop` | ≥2 cells killed on the same scratchpad name | Writing cells that hang — approach too heavy. | +| `detect_severity_climb` | Per-producer strictly-increasing severity run of length ≥3 ending ≥5 | Situation deteriorating without strategy change — ERN crossed threshold. | +| `detect_repair_churn` | ≥3 `history_repair` events in one turn | LLM generating malformed tool_use/result structurally; conversation derailing. | +| `detect_cap_exhausted` | A single `cap_exhausted` event | Round cap hit → mandatory post-mortem rather than silent retry. | + +### Error-signature normalisation + +`detect_repeated_error_signature` runs each error string through `_normalise_error_signature()` before counting — a cheap regex pass that collapses paths, integers, hex addresses, and short quoted tokens into placeholders. That way `"Refusing to save record for engine='gmail-1'"` and `"Refusing to save record for engine='gmail-2'"` hash to the same signature and the detector catches the underlying loop even when the LLM tweaks args between attempts. + +### Producer, not storage + +Like the cerebellum, the ACC is a *producer*. It does not own storage. Lessons it generates flow into the same Engram pipeline that the cerebellum and consolidator already use. The de-dupe predicate is caller-supplied (`has_similar_lesson`) so the wiring layer can choose substring, embedding, or semantic similarity without changing ACC internals. + ## Structured Output — `LLMClient.generate_object` Anton has a single primitive for getting structured data out of the LLM, used by the cerebellum, the consolidator, the cortex's identity/compaction passes, the connect collector, the skill drafter, and the custom-datasource flow. It lives at `anton/llm/client.py`: @@ -662,7 +734,7 @@ The split between *planning* and *coding* providers preserves the original inten ## The Engram — Fundamental Unit of Memory -Every memory trace is represented as an `Engram` dataclass: +Every memory trace is represented as an `Engram` dataclass, defined in `anton/core/memory/base.py`: ```python @dataclass @@ -678,7 +750,7 @@ class Engram: Named for Karl Lashley's *engram* — the hypothesized physical substrate of a memory trace. Each engram flows through the system: ``` -Source (user/LLM/consolidation) +Source (user/LLM/consolidation/cerebellum/ACC) → Engram created → Cortex.encoding_gate() — needs confirmation? → yes: queued for user review before next prompt @@ -689,34 +761,50 @@ Source (user/LLM/consolidation) → lesson: append to lessons.md + optionally topics/{slug}.md ``` +### `HippocampusProtocol` — the storage interface + +`base.py` also defines a `HippocampusProtocol` — a `runtime_checkable` structural `typing.Protocol` describing the public contract of a single-scope memory store (the `recall_*` and `encode_*` methods listed in the next section). The concrete file-backed `Hippocampus` in `hippocampus.py` satisfies it automatically via structural sub-typing. The protocol exists so alternate backends (database-backed, cloud-synced) can be substituted without inheriting from the file-based implementation. This is the seam Enterprise adapters plug into. + ## Module Reference +The long-term memory system lives under `anton/core/memory/`. A small set of legacy / orthogonal modules still lives at the top level under `anton/memory/`. + ``` -anton/memory/ -├── hippocampus.py Engram + Hippocampus class +anton/core/memory/ LONG-TERM MEMORY (brain-mapped modules) +├── base.py Engram dataclass + HippocampusProtocol (structural backend interface) +├── hippocampus.py Hippocampus class — file-backed implementation of the protocol ├── cortex.py Cortex class (executive declarative-memory coordinator) ├── episodes.py Episode + EpisodicMemory class ├── consolidator.py Consolidator class (sleep-replay → Engrams) -├── cerebellum.py Cerebellum class (supervised error learning over scratchpad cells) -├── skills.py Skill, SkillStore, SkillStats — procedural memory storage layer -├── reconsolidator.py needs_reconsolidation() + reconsolidate() functions -├── learnings.py [legacy] LearningStore — replaced by Hippocampus -└── store.py SessionStore — session history (orthogonal to long-term memory) - -anton/llm/ +├── cerebellum.py Cerebellum class (per-cell supervised error learning) +├── acc.py AnteriorCingulate class (turn-level pattern error detection) +└── skills.py Skill, SkillStore, SkillStats — procedural memory storage layer + +anton/memory/ LEGACY / ORTHOGONAL (not the brain-mapped memory system) +├── reconsolidator.py needs_reconsolidation() + reconsolidate() — one-time format migration +├── manage.py MemoryManage class — handlers for /memory and /setup > Memory, MEMORY_MODES dict +├── history_store.py HistoryStore — chat session persistence (transcripts on disk) +├── store.py SessionStore — session list / metadata (different from history_store) +└── learnings.py [legacy] LearningStore — pre-Hippocampus format, kept only for migration + +anton/core/llm/ ├── client.py LLMClient with plan/code/generate_object/generate_object_code ├── structured.py build_structured_tool + unwrap_structured_response (shared helper) -└── ... anthropic.py, openai.py, provider.py, prompt_builder.py +└── ... anthropic.py, openai.py, provider.py, prompt_builder.py, prompts.py -anton/tools/ +anton/core/tools/ ├── recall_skill.py RECALL_SKILL_TOOL — the LLM's procedural memory retrieval primitive ├── tool_handlers.py handle_scratchpad with pre/post-execute observer firing └── ... registry.py, tool_defs.py ``` +### `base.py` — Engram + HippocampusProtocol + +`Engram` is the fundamental memory-trace dataclass (see "The Engram" section above). `HippocampusProtocol` is a `runtime_checkable` structural Protocol defining the read/write contract of a single-scope memory store. Alternate backends (Enterprise, cloud-synced, database-backed) satisfy the protocol by shape — no inheritance from the file-based class needed. + ### `hippocampus.py` — Storage Engine -The Hippocampus handles one scope (global OR project). It doesn't decide what to remember — it just reads and writes. +The Hippocampus handles one scope (global OR project) and is the canonical file-backed implementation of `HippocampusProtocol`. It doesn't decide what to remember — it just reads and writes. **Retrieval methods:** | Method | Reads | Brain Analog | @@ -768,7 +856,7 @@ The EpisodicMemory handles raw conversation logging and recall. | `should_replay(cells)` | Heuristic check: errors, 5+ cells, or cancellations → True | | `replay_and_extract(cells, llm)` | Compress cells → `generate_object_code(_ConsolidatedLessons, ...)` → return Engrams | -### `cerebellum.py` — Supervised Error Learning +### `cerebellum.py` — Supervised Error Learning (per-cell) | Method | Purpose | |---|---| @@ -780,6 +868,23 @@ The EpisodicMemory handles raw conversation logging and recall. | `_run_diff(cells)` | Internal: send buffered cells to `generate_object_code(_DiffPassResult, ...)` and return validated lessons. | | `_encode_lessons(lessons)` | Internal: wrap lessons as Engrams (`kind="lesson"`, `topic="scratchpad"`, `source="consolidation"`) and route through `Cortex.encode()`. | +### `acc.py` — Anterior Cingulate Cortex (turn-level) + +Pattern-level error detection across a turn. Pure detectors as free functions plus a small stateful `AnteriorCingulate` that holds the event stream. + +| Element | Purpose | +|---|---| +| `Event` | Dataclass with `kind`, `severity`, `detail`, `round_idx`. The atomic observation. | +| `Lesson` | Dataclass with `rule`, `triggers`, `detector`. What a detector emits. | +| `EVENT_KINDS` | Frozenset of 9 canonical event-kind strings. `observe()` rejects unknown kinds. | +| `DETECTORS` | Tuple of 9 pure detector functions — see the table in the ACC section above. | +| `AnteriorCingulate.observe(kind, ...)` | Append an `Event`. Rejects unknown kinds. | +| `AnteriorCingulate.at_end_of_turn(has_similar_lesson=...)` | Run all detectors, dedupe via caller-supplied predicate, return new `Lesson` list. | +| `AnteriorCingulate.clear()` | Drop the event stream (between turns). | +| `AnteriorCingulate.events` / `event_kind_counts` | Read-only views for inspection and testing. | + +Tests live at `tests/test_acc.py` (44 tests, 4 layers: pure-function detectors → state tests → JSON-fixture replay → vocabulary discipline). Fixtures at `tests/fixtures/acc/{name_switch,oversized_cell,publish_failure_loop,reset_churn,kill_loop}.json`. + ### `skills.py` — Procedural Memory Store | Method | Purpose | @@ -821,7 +926,48 @@ Two pure helper functions for forced-tool-call structured output. Used by both ` ## Integration Points in chat.py -The memory + skills + cerebellum systems are wired into `ChatSession` and `_chat_loop()`: +The memory + skills + cerebellum + ACC systems are wired into `ChatSession` (defined in `anton/chat.py` for the CLI entry-points; the runtime class actually lives in `anton/core/session.py`, with chat-loop wiring at the top level in `anton/chat.py`). The cerebellum wires via the dispatcher's scratchpad observer list; the ACC wires via direct `session._acc.observe(...)` calls at each emit site (broader emit footprint than scratchpad alone — also tools, history-repair, round-cap). + +### ACC emit sites (Layer 1) + +| Event kind | Emit site | File | +|---|---|---| +| `scratchpad_call` | After args validation in `handle_scratchpad` exec branch | `core/tools/tool_handlers.py` | +| `scratchpad_result` | After `pad.execute()` returns a non-killed cell | `core/tools/tool_handlers.py` | +| `scratchpad_empty_code` | When `prepare_scratchpad_exec` rejects the call | `core/tools/tool_handlers.py` | +| `scratchpad_reset` | After `pad.reset()` in the reset action | `core/tools/tool_handlers.py` | +| `scratchpad_killed` | After `pad.execute()` returns a cell whose `error` starts with `Cancelled`/`Cell timed out`/`Cell killed` | `core/tools/tool_handlers.py` | +| `tool_call` | At top of per-tc loop in `_stream_and_handle_tools` | `core/session.py` | +| `tool_result` | After result_text is finalized, before `tool_results.append` | `core/session.py` | +| `history_repair` | After `_seal_dangling_tool_uses` actually inserts synthetic blocks | `core/session.py` | +| `cap_exhausted` | When `tool_round > self._max_tool_rounds` | `core/session.py` | + +### End-of-turn drain + +`_schedule_acc_flush()` lives next to `_schedule_cerebellum_flush()` and runs at the same two spots — end of `turn()` and end of `turn_stream()`. Fire-and-forget: detectors are pure (no LLM call) and the only async work is `cortex.encode()`, but we still wrap the encode in `asyncio.create_task` so file I/O doesn't block the user-facing reply. + +Each `Lesson` becomes an `Engram(text=rule, kind=lesson.kind, scope="global", confidence="high", source="consolidation")`. The `kind` is whatever the detector tagged (`always`/`never`/`when`) — no string-matching at the wiring layer. Lessons land in `~/.anton/memory/rules.md` under the corresponding `## Always` / `## Never` / `## When` section, and the next turn's system prompt picks them up via the existing `cortex.build_memory_context()` pipeline. + +### Mid-turn nudge (`_acc_maybe_nudge`) + +Called by `_stream_and_handle_tools` immediately after `tool_results` is built and before that user message gets appended to history. When `ANTON_ACC_MODE == "active"`: + + 1. Runs `acc.at_round_n()` — re-evaluates every detector against the events buffered so far this turn and returns only lessons whose detector hasn't already nudged this turn. + 2. For each newly-fired lesson, appends `{"type": "text", "text": "[Anton self-check — ] "}` to the `tool_results` content array. + 3. The LLM sees those text blocks alongside the tool_result blocks on its very next round. + +One nudge per detector per turn — re-stating the same alarm round after round would inflate history without changing behaviour. Cleared on the same `clear()` boundary the event buffer uses. The mid-turn path deliberately does NOT consult `has_similar_lesson`: if a rule is already in `rules.md` but the LLM is violating it right now, repeating the rule inline is the whole point. + +### `_acc_observe` safe-emit wrapper + +Every emit site calls `session._acc_observe(kind, detail, ...)` rather than touching `session._acc` directly. This wrapper: + - Returns silently when the ACC isn't attached (defensive). + - Returns silently when the cortex is disabled (`mode == "off"`) — observation without persistence is pointless. + - Catches `ValueError` from `observe()` on unknown kinds so emit-site drift never breaks a turn. + +### De-dupe predicate + +The ACC is constructed with `has_similar_lesson=_acc_has_similar`, a closure that does a cheap substring match against the current `rules.md` content. Prevents the same lesson being re-encoded on every turn. Embedding similarity is a v2 upgrade. ``` 1. _chat_loop() startup: diff --git a/anton/core/memory/acc.py b/anton/core/memory/acc.py new file mode 100644 index 00000000..17c35e72 --- /dev/null +++ b/anton/core/memory/acc.py @@ -0,0 +1,744 @@ +"""Anterior Cingulate Cortex (ACC) — pattern-level error detection. + +Brain analogue +============== + +The anterior cingulate cortex is the brain's error-detector. It fires +the *error-related negativity* (ERN) ~80 ms after the brain notices +that an actual outcome diverged from an expected one. The signal it +emits flows downstream to the dopaminergic midbrain (computing the +*reward prediction error*) and onward to the striatum (updating +action policies) and the dorsolateral prefrontal cortex (adjusting +strategy on the next attempt). + +Anton's analogue +================ + +Anton's ACC watches a turn unfold, classifies events as they arrive, +and at end-of-turn extracts *actionable lessons* from the patterns +that fired more than once. The cerebellum (in this directory) +already handles the per-cell forward-model error — predicted vs +actual outcome of a single scratchpad cell. The ACC handles a +larger time scale: patterns *across* cells within a single task. +Examples we've seen in real sessions: + + - The LLM switched scratchpad names mid-task (`build_pres` → + `write_html` → `pres1`). Each name is a separate isolated + environment; variables in one don't exist in another. Burned 8 + rounds on recovery. + + - Repeated scratchpad calls with empty `code` parameters because a + large HTML string serialised to "" in the tool-call schema. + Burned 10 rounds before anyone noticed. + + - The same tool failed three times in a row with the same args. + +These are not per-cell prediction errors — they're patterns over +multiple events. ACC's job is to notice them and write one-sentence +rules that the next turn's system prompt picks up via the existing +cortex memory path. + +Storage and retrieval +===================== + +The ACC is a *producer* only. It does not own its own storage. +Lessons it extracts flow into the same `Engram` pipeline that the +cerebellum and the consolidator already use — see `cortex.encode()` +and `consolidator.consolidate()`. The ACC does not duplicate the +de-dupe logic; it consults a caller-supplied `has_similar_lesson` +predicate so the wiring layer can use whichever similarity check +fits (substring, embedding, semantic). + +Design constraints +================== + + - **Pure where possible.** Each `detect_*` function is a free + function of `Sequence[Event] → Lesson | None`. Easy to unit-test + in isolation, easy to add new ones, easy to delete bad ones. + + - **No LLM calls at detect time.** Detectors are deterministic + pattern matchers. The lesson `rule` strings are templates + parameterised by what was seen. Keep this property: the cortex + has its own LLM-based consolidation; ACC's job is to surface + candidates cheaply. + + - **De-dupe at the seam.** Two detectors might match overlapping + patterns. Resolution is on the caller — pass a `has_similar_lesson` + that consults the existing memory store before persisting. + + - **Bounded buffer.** Events are kept in memory for one turn only; + `clear()` drops them at end-of-turn. The persisted artifact is + the lesson, not the event log. + +Adding a new detector +===================== + + 1. Write a function `detect_(events: Sequence[Event]) -> + Lesson | None`. + 2. Add it to `DETECTORS`. + 3. Add a positive + negative unit test in `tests/test_acc.py`. + 4. If the pattern was discovered from a real failure, also drop + the captured event trace into `tests/fixtures/acc/.json` + and write a replay test that asserts the lesson fires. + +That recipe keeps the regression suite as a living catalogue of +Anton's known failure modes — exactly the post-mortem loop that +inspired this module. +""" + +from __future__ import annotations + +import re +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from typing import Callable, Literal, Sequence + + +# ───────────────────────────────────────────────────────────────────────────── +# Public data types +# ───────────────────────────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class Event: + """One observed signal in the current turn. + + Attributes: + kind: One of the canonical strings in `EVENT_KINDS`. Adding a + new value requires updating `EVENT_KINDS` and writing at + least one detector that consumes it. + severity: 1-10. Detectors use this as a rough filter ("only + care about ≥ 5 events"). The wiring layer picks the + value at emit time — we deliberately don't try to score + here, because severity is context-dependent and the + emitter has more context than the detector does. + detail: Free-form payload. Detectors that need a specific + field document it in their docstring. + round_idx: The 0-based round inside the current turn. Useful + for clustering (e.g. "three failures within 4 rounds"). + """ + + kind: str + severity: int + detail: dict + round_idx: int + + +# Canonical kind vocabulary. Add a new value here only when you also +# add (a) the emitter that publishes it and (b) a detector that +# reads it. Drift between these three is what makes event-based +# architectures rot — keeping the list explicit is the brake. +EVENT_KINDS = frozenset({ + # Scratchpad + "scratchpad_call", # detail: {name, code_len, one_line_description} + "scratchpad_result", # detail: {name, success, stdout_len, error} + "scratchpad_empty_code", # detail: {name} — code parameter was empty/missing + "scratchpad_reset", # detail: {name, reason} — venv/state cleared mid-turn + "scratchpad_killed", # detail: {name, reason} — cell killed (timeout/cancel/OOM) + # Tools + "tool_call", # detail: {name, args_summary} — paired with tool_result; producer-only today + "tool_result", # detail: {name, success, error} + # Session-level + "history_repair", # detail: {reason} — tool_use/result mismatch repair fired + "cap_exhausted", # detail: {} — terminal: hit max_rounds +}) + + +LessonKind = Literal["always", "never", "when"] + + +@dataclass(frozen=True) +class Lesson: + """An actionable rule the ACC wants future-Anton to remember. + + Attributes: + rule: The one-sentence rule to memorise. Phrase as guidance + ("Use ONE scratchpad name per task — switching mid-task + isolates variables in separate environments.") rather + than judgement of past behaviour. + kind: Which behavioural section this rule lands in when it + flows through `Cortex.encode()`. Maps directly to + `Engram.kind`: + * `always` — unconditional habit ("Use ONE scratchpad…"). + * `never` — unconditional prohibition ("Don't reset…"). + * `when` — conditional rule ("When a tool fails…"). + The detector knows the semantics; routing by string- + matching the rule at the wiring layer would be brittle. + triggers: The event kinds that contributed to firing this + lesson. Used for audit + for de-dupe across detectors. + detector: Name of the detector function that produced it. + Helps when an unexpected lesson shows up in memory — + you can find its origin without a full re-derivation. + """ + + rule: str + kind: LessonKind + triggers: tuple[str, ...] + detector: str + + +# Type alias for clarity at the seams. +Detector = Callable[[Sequence[Event]], Lesson | None] + + +# ───────────────────────────────────────────────────────────────────────────── +# Detectors — pure functions of the event sequence +# ───────────────────────────────────────────────────────────────────────────── + + +# Threshold for "cell was too big for the scratchpad to handle reliably". +# Chosen at 5000 chars based on the empirical "silent empty-code drop" +# pattern seen on payloads above ~4-6 KB. Tuneable. +_OVERSIZED_CELL_CHARS = 5000 + +# Minimum distinct names seen before we flag fragmentation. Two is +# enough to fire — there is no legitimate reason to spin up a second +# scratchpad mid-task; if there were we'd add an exception, but +# every observed instance has been an error. +_NAME_SWITCH_THRESHOLD = 2 + +# Minimum consecutive failures of the same tool before we flag a +# "retry-with-same-args" loop. Two is enough for the lesson to be +# actionable; lifts to three would miss the cap-exhaustion case +# where the round budget runs out at attempt 2. +_REPEATED_TOOL_ERROR_THRESHOLD = 2 + +# Minimum occurrences of the same normalised error signature across +# any producers (tool_result / scratchpad_result) before we flag a +# blind-retry loop. Three is the right floor: two same-errors is the +# LLM doing one legitimate retry after a transient failure; three is +# a real loop. Generalises detect_repeated_tool_error to errors that +# repeat across different tools or with arg tweaks in between. +_REPEATED_ERROR_THRESHOLD = 3 + +# Number of scratchpad resets in one turn before we flag state- +# abandonment. Two is enough — one reset can be intentional cleanup, +# two means the LLM is using reset as a debugging primitive. +_RESET_CHURN_THRESHOLD = 2 + +# Number of cells killed (timeout/cancel/OOM) on the same scratchpad +# name before we flag a "writing cells that hang" pattern. +_KILL_LOOP_THRESHOLD = 2 + +# Number of history_repair events in one turn before we flag a +# derailing conversation. Three is the right floor — one or two +# repairs are normal model-side hiccups; three means the LLM is +# generating malformed tool_use/result pairs structurally. +_REPAIR_CHURN_THRESHOLD = 3 + +# detect_severity_climb fires when a producer emits a strictly- +# increasing severity sequence of length >= _SEVERITY_CLIMB_LEN whose +# final element is >= _SEVERITY_CLIMB_PEAK. Brain analog: the ERN +# climbs as outcomes worsen — when the climb crosses a threshold, +# the dlPFC is supposed to switch strategy rather than amplify. +_SEVERITY_CLIMB_LEN = 3 +_SEVERITY_CLIMB_PEAK = 5 + + +def _normalise_error_signature(text: str) -> str: + """Collapse the variable parts of an error message into a stable + signature so that "Refusing to save record for engine='gmail-1'" + and "Refusing to save record for engine='gmail-2'" hash to the + same string. Cheap regex pass — paths, integers, hex, quoted + short tokens all become placeholders. + """ + s = text or "" + s = re.sub(r"0x[0-9a-fA-F]+", "0xN", s) + s = re.sub(r"\b\d+\b", "N", s) + s = re.sub(r"'[^']{1,80}'", "'X'", s) + s = re.sub(r'"[^"]{1,80}"', '"X"', s) + s = re.sub(r"(?:[A-Za-z]:)?(?:/|\\\\)[^\s'\"]+", "/P", s) + s = re.sub(r"\s+", " ", s).strip() + return s + + +def detect_name_switch(events: Sequence[Event]) -> Lesson | None: + """The LLM used >1 distinct scratchpad name in one turn. + + Each scratchpad name is a separate isolated environment in + anton's runtime. Variables in `A` are not visible to code + running in `B`. The LLM occasionally treats them as one + namespace and ends up re-defining symbols, re-fetching data, + and burning rounds. + + Reads `kind == "scratchpad_call"` events; looks at `detail.name`. + """ + names = { + e.detail.get("name") + for e in events + if e.kind == "scratchpad_call" and e.detail.get("name") + } + if len(names) < _NAME_SWITCH_THRESHOLD: + return None + return Lesson( + rule=( + "Use ONE scratchpad name per task and reuse it for every cell. " + "Each name is a separate isolated environment — variables in one " + "are not visible to code running in another. Switching mid-task " + "(e.g. `build_pres` → `write_html`) forces re-imports, re-fetches, " + "and burns rounds on recovery." + ), + kind="always", + triggers=("scratchpad_call",), + detector="detect_name_switch", + ) + + +def detect_oversized_cell(events: Sequence[Event]) -> Lesson | None: + """A scratchpad cell exceeded the size we know to be reliable. + + Empirical: scratchpad calls with code payloads above ~5 KB + occasionally drop the `code` parameter entirely (the tool sees + an empty string and returns "No code provided"), which still + counts against the round cap. The lesson: build the output + incrementally on disk instead of holding it all in one string. + + Reads `kind == "scratchpad_call"` (uses `detail.code_len`) and + `kind == "scratchpad_empty_code"` (the failure mode itself). + """ + too_big = [ + e for e in events + if e.kind == "scratchpad_call" and int(e.detail.get("code_len", 0)) >= _OVERSIZED_CELL_CHARS + ] + empty = [e for e in events if e.kind == "scratchpad_empty_code"] + if not too_big and not empty: + return None + # Don't fire on a single one-off big cell; only when there's + # either a *pattern* of big cells OR we've actually observed the + # empty-code failure mode that big cells trigger. + if not empty and len(too_big) < 2: + return None + return Lesson( + rule=( + "Keep individual scratchpad cells under ~5 KB of string content. " + "When generating large outputs (HTML, reports), write to disk " + "incrementally — `open(path, 'w')` once, then `open(path, 'a')` " + "to append each chunk. Building one large string in memory and " + "writing it in a single cell occasionally drops the `code` " + "parameter on the wire and silently burns a round." + ), + kind="always", + triggers=tuple({"scratchpad_call", "scratchpad_empty_code"} & {e.kind for e in (*too_big, *empty)}), + detector="detect_oversized_cell", + ) + + +def detect_repeated_tool_error(events: Sequence[Event]) -> Lesson | None: + """The same tool returned an error >= N times in a row. + + Reads `kind == "tool_result"` (uses `detail.name`, `detail.success`, + `detail.error`). Looks for the longest consecutive run of failures + keyed on the tool name. The lesson is generic — the rule reminds + future-Anton that retrying with no diagnosis is the failure mode; + the specific tool name lands in `triggers` for audit. + """ + # Walk in chronological order, track longest run per tool name. + longest: defaultdict[str, int] = defaultdict(int) + current_name: str | None = None + current_run = 0 + for e in events: + if e.kind != "tool_result": + continue + name = e.detail.get("name") or "" + if not e.detail.get("success", False): + if name == current_name: + current_run += 1 + else: + current_name = name + current_run = 1 + longest[name] = max(longest[name], current_run) + else: + # success breaks the run + current_name = None + current_run = 0 + bad = [name for name, n in longest.items() if n >= _REPEATED_TOOL_ERROR_THRESHOLD] + if not bad: + return None + return Lesson( + rule=( + "When a tool fails, don't retry with the same arguments. " + "Inspect the error, change one thing (args, preconditions, " + "or strategy), and only then retry. A run of identical " + "failures is signal to escalate — surface the error to the " + "user or pick a different tool." + ), + kind="when", + triggers=("tool_result",), + detector="detect_repeated_tool_error", + ) + + +def detect_repeated_error_signature(events: Sequence[Event]) -> Lesson | None: + """The same normalised error signature appeared >= N times in one turn. + + Generalises `detect_repeated_tool_error`: the latter only catches + consecutive failures of the *same tool with the same args*. This + detector catches the broader pattern where the SAME error message + keeps appearing across the turn — across different tools, across + arg tweaks, anywhere. Maps directly to two real bugs we've already + debugged: + + - publish-from-chat: "PUBLISH FAILED: settings module unavailable" + appeared three times across `publish_or_preview` retries. + - gmail datavault: "Refusing to save empty credential record" + appeared multiple times even as the LLM tweaked arg shapes. + + Reads `kind in {"tool_result", "scratchpad_result"}`; takes + `detail.success` and `detail.error`. + """ + sigs: list[str] = [] + for e in events: + if e.kind not in ("tool_result", "scratchpad_result"): + continue + if e.detail.get("success", True): + continue + err = e.detail.get("error") or "" + if not err: + continue + sigs.append(_normalise_error_signature(err)) + if not sigs: + return None + top, count = Counter(sigs).most_common(1)[0] + if count < _REPEATED_ERROR_THRESHOLD: + return None + return Lesson( + rule=( + "When the same error message appears repeatedly in one turn, " + "the cause hasn't changed between attempts — retrying won't help. " + "Stop, inspect what produced the error, and either fix the " + "precondition, pick a different tool, or surface the failure to " + "the user. The error signature is the signal; the tool name is not." + ), + kind="when", + triggers=("tool_result", "scratchpad_result"), + detector="detect_repeated_error_signature", + ) + + +def detect_reset_churn(events: Sequence[Event]) -> Lesson | None: + """Scratchpad was reset/cleared >= N times in one turn. + + A reset wipes the venv: all imports, all in-memory state, all + partial results are gone. One reset can be intentional cleanup + (the user asked for a clean slate). Two or more means the LLM is + using reset as a debugging primitive — almost always a mistake, + because the next cell now has to re-import, re-fetch, and re- + reason from scratch, burning rounds. + + Reads `kind == "scratchpad_reset"`. + """ + resets = [e for e in events if e.kind == "scratchpad_reset"] + if len(resets) < _RESET_CHURN_THRESHOLD: + return None + return Lesson( + rule=( + "Don't reset the scratchpad to recover from errors. A reset " + "discards every variable, import, and partial result — the next " + "cell has to re-fetch and re-import everything. Debug the " + "failing cell in place: print local state, narrow the scope, " + "comment out the broken bit. Reach for reset only when the venv " + "is genuinely corrupted, not when one cell raised." + ), + kind="never", + triggers=("scratchpad_reset",), + detector="detect_reset_churn", + ) + + +def detect_kill_loop(events: Sequence[Event]) -> Lesson | None: + """The same scratchpad name had >= N cells killed (timeout/cancel/OOM). + + Reads `kind == "scratchpad_killed"`; looks at `detail.name`. + """ + by_name: defaultdict[str, int] = defaultdict(int) + for e in events: + if e.kind != "scratchpad_killed": + continue + n = e.detail.get("name") or "" + if n: + by_name[n] += 1 + if not by_name or max(by_name.values()) < _KILL_LOOP_THRESHOLD: + return None + return Lesson( + rule=( + "When a scratchpad cell is killed (timeout, cancel, OOM), " + "the next cell on the same scratchpad needs to be smaller — " + "fewer rows, smaller batch, explicit timeout, narrower scope. " + "Two kills on the same scratchpad means the approach itself is " + "too heavy, not that the same cell needs another try." + ), + kind="when", + triggers=("scratchpad_killed",), + detector="detect_kill_loop", + ) + + +def detect_severity_climb(events: Sequence[Event]) -> Lesson | None: + """A producer emits a strictly-increasing severity sequence ending high. + + Per-producer (grouped by `detail.name`), find a strictly increasing + severity run of length >= _SEVERITY_CLIMB_LEN whose final element is + >= _SEVERITY_CLIMB_PEAK. That's the "situation is deteriorating" + pattern: each attempt fails worse than the last and the LLM keeps + going. Brain analog: the ACC's error signal climbs as outcomes + worsen — when it crosses threshold the dlPFC is supposed to switch + strategy rather than amplify. + + Reads severity from every event; uses `detail.name` to bucket per + producer. No specific kind required — that's intentional, this + pattern can show up on scratchpad_result OR tool_result. + """ + by_name: defaultdict[str, list[int]] = defaultdict(list) + for e in events: + n = e.detail.get("name") or "" + if not n: + continue + by_name[n].append(int(e.severity)) + for sevs in by_name.values(): + # Longest strictly-increasing suffix ending at each index. + run = 1 + for i in range(1, len(sevs)): + if sevs[i] > sevs[i - 1]: + run += 1 + if run >= _SEVERITY_CLIMB_LEN and sevs[i] >= _SEVERITY_CLIMB_PEAK: + return Lesson( + rule=( + "When successive results on the same target get " + "worse rather than better — escalating severity, " + "deeper failures — the current strategy is wrong, " + "not under-applied. Stop iterating and switch " + "approach: different tool, different decomposition, " + "or surface the situation to the user." + ), + kind="when", + triggers=("scratchpad_result", "tool_result"), + detector="detect_severity_climb", + ) + else: + run = 1 + return None + + +def detect_repair_churn(events: Sequence[Event]) -> Lesson | None: + """`history_repair` fired >= N times in one turn. + + The history-repair pass kicks in when the LLM's tool_use / tool_result + sequence is structurally malformed (orphaned tool_use, mismatched ids, + out-of-order results). One repair is a hiccup, two is unlucky, three + means the conversation is structurally derailing and the LLM is + burning rounds on recovery instead of progress. + + Reads `kind == "history_repair"`. + """ + n = sum(1 for e in events if e.kind == "history_repair") + if n < _REPAIR_CHURN_THRESHOLD: + return None + return Lesson( + rule=( + "Repeated history-repair events in one turn mean the LLM is " + "generating malformed tool_use/result pairs structurally — " + "the conversation is derailing. Pause the loop, surface the " + "situation to the user, and ask for direction instead of " + "continuing to retry." + ), + kind="when", + triggers=("history_repair",), + detector="detect_repair_churn", + ) + + +def detect_cap_exhausted(events: Sequence[Event]) -> Lesson | None: + """The turn hit the round cap. Single-occurrence — not a pattern + detector, but a *trigger* for the post-mortem path. + + Reads `kind == "cap_exhausted"`. + """ + if not any(e.kind == "cap_exhausted" for e in events): + return None + return Lesson( + rule=( + "Hitting the round cap means a goal was attempted that didn't " + "fit the available budget. Don't silently declare done — " + "produce a post-mortem: what was tried, what failed, what the " + "next turn should try differently. The user gets context; " + "future turns get a lesson; nothing is lost in the void." + ), + kind="when", + triggers=("cap_exhausted",), + detector="detect_cap_exhausted", + ) + + +# Ordered list — earlier entries get first crack at firing. Order +# matters only when two detectors would produce overlapping lessons; +# `at_end_of_turn` de-dupes anyway, but a stable order makes test +# assertions less brittle. +DETECTORS: tuple[Detector, ...] = ( + detect_name_switch, + detect_oversized_cell, + detect_repeated_tool_error, + detect_repeated_error_signature, + detect_reset_churn, + detect_kill_loop, + detect_severity_climb, + detect_repair_churn, + detect_cap_exhausted, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# The ACC itself — per-session error-detection coordinator +# ───────────────────────────────────────────────────────────────────────────── + + +class AnteriorCingulate: + """Per-turn error pattern detector. + + Instantiated once per `ChatSession`. The session calls `observe()` + whenever a noteworthy event happens (scratchpad result, tool + failure, history repair, …). At end-of-turn the session calls + `at_end_of_turn()` to extract any lessons that fell out of the + event sequence; the wiring layer then routes them through the + existing `cortex.encode()` path to land in long-term memory. + + Thread-safety: not thread-safe. The session is single-threaded + per turn; if multiple turns share an ACC instance, the buffer + needs locking. We intentionally don't add locking here because + the simpler invariant (one ACC per turn, dropped after) is easier + to reason about. Wiring should follow that. + """ + + def __init__( + self, + *, + has_similar_lesson: Callable[[str], bool] | None = None, + detectors: Sequence[Detector] = DETECTORS, + ) -> None: + self._events: list[Event] = [] + self._has_similar = has_similar_lesson or (lambda _rule: False) + self._detectors = tuple(detectors) + # Names of detectors that have already produced a mid-turn + # nudge (via at_round_n) this turn. Tracks "newly fired" so + # the same alarm isn't injected into history on every + # subsequent round. Reset by `clear()` between turns. + self._nudged_detectors: set[str] = set() + + def observe( + self, + kind: str, + detail: dict | None = None, + *, + severity: int = 1, + round_idx: int = 0, + ) -> None: + """Append an event to the turn's buffer. + + Raises `ValueError` for unknown `kind` so the vocabulary + stays disciplined. If you need a new kind, add it to + `EVENT_KINDS` and to at least one detector. + """ + if kind not in EVENT_KINDS: + raise ValueError( + f"Unknown ACC event kind: {kind!r}. " + f"Add it to EVENT_KINDS in anton/core/memory/acc.py " + f"and to at least one detector before emitting." + ) + self._events.append(Event( + kind=kind, + severity=int(severity), + detail=dict(detail or {}), + round_idx=int(round_idx), + )) + + def at_end_of_turn(self) -> list[Lesson]: + """Run every detector against the buffered events. + + Returns lessons that: + - actually fired (detector returned a non-None Lesson), and + - aren't already in memory (per `has_similar_lesson`). + + The buffer is NOT cleared automatically — callers may want to + inspect it after extraction (for logging, telemetry, an + end-of-task post-mortem). Call `clear()` explicitly between + turns. + """ + out: list[Lesson] = [] + seen_rules: set[str] = set() + for d in self._detectors: + lesson = d(self._events) + if lesson is None: + continue + # Cross-detector de-dupe by rule text. Two detectors + # shouldn't produce the same rule, but if a future + # contributor adds an overlapping pattern this keeps the + # memory store clean. + if lesson.rule in seen_rules: + continue + if self._has_similar(lesson.rule): + continue + seen_rules.add(lesson.rule) + out.append(lesson) + return out + + def at_round_n(self) -> list[Lesson]: + """Run detectors against the current buffer and return only + *newly-fired* lessons since the previous call this turn. + + Layer 2 — mid-turn nudging. Where `at_end_of_turn()` runs once + per turn to drain lessons into long-term memory, this runs after + each tool-call round so the LLM sees the alarm right when the + pattern is happening, not on the next turn. + + Brain analog: the ACC's ERN fires as soon as a divergence is + detected. The dlPFC reads the alarm and can adjust strategy on + the very next action — not at end-of-task. + + Implementation notes: + - Detectors are pure functions, so re-running them on the + growing event buffer is cheap. No memoisation. + - We track *which detectors* have already nudged this turn + (`self._nudged_detectors`) so the same alarm doesn't get + injected into history on every subsequent round. One nudge + per detector per turn is enough — if the LLM ignores it, + re-stating it round after round won't help and would only + inflate the history. + - We deliberately do NOT consult `has_similar_lesson` here. + For mid-turn nudges we want to re-assert the rule inline + even when it already lives in `rules.md` — the LLM clearly + isn't following the in-prompt version, so making it visible + again in immediate context is the whole point. + + Returns the newly-fired lessons (possibly empty). + """ + fresh: list[Lesson] = [] + for d in self._detectors: + name = getattr(d, "__name__", "") + if name in self._nudged_detectors: + continue + lesson = d(self._events) + if lesson is None: + continue + self._nudged_detectors.add(name) + fresh.append(lesson) + return fresh + + def clear(self) -> None: + """Drop the event buffer. Call between turns.""" + self._events.clear() + self._nudged_detectors.clear() + + # ── Introspection helpers (used by tests + telemetry) ──────────── + + @property + def events(self) -> tuple[Event, ...]: + """Read-only view of the current turn's events.""" + return tuple(self._events) + + @property + def event_kind_counts(self) -> dict[str, int]: + """Histogram of event kinds in the current turn. Cheap; used + by `__repr__` and by any future per-session telemetry.""" + return dict(Counter(e.kind for e in self._events)) + + def __repr__(self) -> str: # pragma: no cover — debug only + return f"AnteriorCingulate(events={len(self._events)}, by_kind={self.event_kind_counts})" diff --git a/anton/core/session.py b/anton/core/session.py index d012718f..3e0f685c 100644 --- a/anton/core/session.py +++ b/anton/core/session.py @@ -4,12 +4,15 @@ from collections.abc import AsyncIterator, Callable from dataclasses import asdict, dataclass, field import json +import os from typing import TYPE_CHECKING from anton.core.backends.base import Cell, ScratchpadRuntimeFactory from anton.core.backends.local import local_scratchpad_runtime_factory from anton.core.datasources.data_vault import DataVault from anton.core.llm.prompt_builder import ChatSystemPromptBuilder, SystemPromptContext +from anton.core.memory.acc import AnteriorCingulate +from anton.core.memory.base import Engram from anton.core.memory.cerebellum import Cerebellum from anton.core.memory.skills import SkillStore from anton.core.tools.recall_skill import RECALL_SKILL_TOOL @@ -146,10 +149,55 @@ def __init__(self, config: ChatSessionConfig) -> None: cortex=self._cortex, llm=self._llm, ) + # Anterior Cingulate Cortex: turn-level pattern detection. + # Where the cerebellum looks at one cell and asks "did this + # cell do what it claimed", the ACC looks at the whole turn + # and asks "is the same failure pattern firing more than + # once". Emit points are scattered (scratchpad dispatcher, + # tool dispatch, history-repair, round-cap) rather than + # routed through the scratchpad observer list, because most + # of what the ACC watches isn't scratchpad-scoped. The + # session holds the reference; emit sites call + # `session._acc.observe(kind, detail, ...)` directly. + # + # has_similar_lesson: cheap substring check against the + # current rules.md content. Avoids re-encoding the same + # rule every turn. Embedding similarity is a v2 upgrade. + def _acc_has_similar(rule: str) -> bool: + cortex = getattr(self, "_cortex", None) + hc = getattr(cortex, "global_hc", None) if cortex else None + if hc is None: + return False + try: + existing = hc.recall_rules() or "" + except Exception: + return False + probe = (rule or "")[:60].lower() + return bool(probe) and probe in existing.lower() + + self._acc = AnteriorCingulate(has_similar_lesson=_acc_has_similar) + # ANTON_ACC_MODE controls how aggressively ACC affects the + # turn. Mirrors ANTON_MEMORY_MODE for shape consistency: + # "off" — ACC observes nothing (skipped at every emit site). + # "passive" — Layer 1: lessons drain to memory at end-of-turn, + # next turn's system prompt picks them up. SAFE + # DEFAULT — adds no surface-area to the turn loop. + # "active" — Layer 2: ALSO inject lessons inline as text + # blocks in tool_results so the LLM sees them on + # the very next round. Stronger learning signal, + # but more invasive — the LLM has to handle the + # nudge gracefully without confusing it for a + # user instruction. + _mode_raw = os.environ.get("ANTON_ACC_MODE", "passive").strip().lower() + self._acc_mode = _mode_raw if _mode_raw in ("off", "passive", "active") else "passive" # Scratchpad observers — list of objects with on_pre_execute / # on_post_execute. Fired by handle_scratchpad around pad.execute. # The runtime never sees this list; observation lives at the # dispatcher layer to keep local/remote runtimes interchangeable. + # ACC is intentionally NOT in this list — its emit footprint + # is broader than scratchpad cells (it also needs to see tool + # calls, history repairs, the round cap), so it's wired via + # direct `session._acc.observe(...)` at each emit site. self._scratchpad_observers: list = [self._cerebellum] self._explainability_store = ( ExplainabilityStore(config.workspace.base) if config.workspace is not None else None @@ -770,6 +818,15 @@ def _seal_dangling_tool_uses(self, reason: str = "interrupted") -> int: last_assistant_idx + 1, {"role": "user", "content": synth_blocks}, ) + # ACC: emit history_repair so detect_repair_churn can fire + # when the LLM is generating malformed tool_use/result pairs + # repeatedly. One repair is a hiccup; three in a turn is the + # conversation derailing. + self._acc_observe( + "history_repair", + {"reason": reason, "sealed_count": len(missing)}, + severity=5, + ) return len(missing) def hard_truncate_history(self, keep: int = 4) -> None: @@ -933,6 +990,141 @@ def factory_validated(): async for event in self._llm.plan_stream(messages=factory_validated(), **kwargs): yield event + def _acc_observe( + self, + kind: str, + detail: dict | None = None, + *, + severity: int = 1, + round_idx: int = 0, + ) -> None: + """Safe-emit wrapper for ACC events. + + Returns silently when: + - the ACC isn't attached (defensive — should always be set), + - the cortex is disabled (`mode == "off"`), so observation + without persistence is pointless, + - `observe()` raises (e.g. unknown kind from a stale call site). + + Emit sites call this rather than touching `self._acc` directly + so that adding/renaming kinds, or turning the ACC off via a + future env var, lives in one place. + """ + acc = getattr(self, "_acc", None) + if acc is None: + return + if getattr(self, "_acc_mode", "passive") == "off": + return + cortex = getattr(self, "_cortex", None) + if cortex is not None and getattr(cortex, "mode", "") == "off": + return + try: + acc.observe(kind, detail or {}, severity=severity, round_idx=round_idx) + except ValueError: + # Unknown event kind from a stale emit site — log via the + # cerebellum's logger contract once we have one; for now, + # swallow so observation drift never breaks a turn. + pass + + def _acc_maybe_nudge(self, tool_results: list[dict]) -> int: + """Layer 2 — mid-turn nudging. + + If `ANTON_ACC_MODE == "active"`, run the ACC's per-round + detection pass and append any newly-fired lessons as text + blocks INSIDE the `tool_results` content list. They piggy-back + on the user-role message that's about to be appended to + history, so the LLM sees them on its very next round. + + Why text blocks alongside tool_result blocks (vs. a separate + user message)? Anthropic's API allows a user message to mix + types in its content array. Reusing the same message keeps the + nudge tightly bound to the round that produced it and avoids + introducing a new consecutive-user-message edge case that the + history validator would have to learn about. + + Returns the number of nudges appended (mostly for tests / + observability). Zero in passive mode, zero when no detectors + newly fired. + """ + if getattr(self, "_acc_mode", "passive") != "active": + return 0 + acc = getattr(self, "_acc", None) + if acc is None: + return 0 + try: + lessons = acc.at_round_n() + except Exception: + # Defensive: a buggy detector should never crash the turn. + # Layer 1 still drains at end-of-turn so we lose nothing. + return 0 + if not lessons: + return 0 + for lesson in lessons: + tool_results.append({ + "type": "text", + "text": ( + f"[Anton self-check — {lesson.detector}] {lesson.rule} " + "(This is an automatic mid-turn observation from your own " + "monitoring layer, not a user message.)" + ), + }) + return len(lessons) + + def _schedule_acc_flush(self) -> None: + """Drain the ACC's turn buffer into Engrams and clear it. + + Parallel to `_schedule_cerebellum_flush()`: same fire-and- + forget contract, same end-of-turn slot. The ACC's detectors + are pure functions (no LLM call), so running them is cheap; + the only async work is `cortex.encode()`, which writes the + lessons to disk. We still wrap it in `asyncio.create_task` + so the user-facing reply isn't blocked on file I/O. + + Best-effort: if there's no event loop (sync test, edge case), + we drop the buffer rather than raise. + """ + acc = getattr(self, "_acc", None) + if acc is None: + return + cortex = getattr(self, "_cortex", None) + if cortex is None or getattr(cortex, "mode", "") == "off": + acc.clear() + return + + lessons = acc.at_end_of_turn() + if not lessons: + acc.clear() + return + + engrams = [ + Engram( + text=l.rule, + kind=l.kind, # always / never / when from the detector + scope="global", # ACC lessons are cross-project + confidence="high", # detectors only fire on confirmed patterns + source="consolidation", + ) + for l in lessons + ] + + # Check for a running event loop first so we don't construct a + # coroutine object only to drop it (which triggers an unawaited- + # coroutine warning). ACC learning is best-effort, same as + # cerebellum learning — if there's no loop we drop the buffer. + try: + asyncio.get_running_loop() + except RuntimeError: + acc.clear() + return + + async def _drain() -> None: + try: + await cortex.encode(engrams) + finally: + acc.clear() + + asyncio.create_task(_drain()) + def _schedule_cerebellum_flush(self) -> None: """Fire the cerebellum's batched diff pass without blocking the turn. @@ -1073,6 +1265,7 @@ async def turn(self, user_input: str | list[dict]) -> str: # in the background. Brain analogue: cerebellar plasticity # operates in parallel with continued action, not blocking it. self._schedule_cerebellum_flush() + self._schedule_acc_flush() return reply @@ -1196,6 +1389,7 @@ async def turn_stream( # the non-streaming turn. Lets the user-facing stream finish # immediately while supervised error learning runs in the background. self._schedule_cerebellum_flush() + self._schedule_acc_flush() async def _stream_and_handle_tools( self, user_message: str = "" @@ -1274,6 +1468,12 @@ async def _stream_and_handle_tools( tool_round += 1 if tool_round > self._max_tool_rounds: _max_rounds_hit = True + self._acc_observe( + "cap_exhausted", + {"cap": self._max_tool_rounds}, + severity=9, + round_idx=tool_round, + ) self._append_history( {"role": "assistant", "content": llm_response.content or ""} ) @@ -1317,6 +1517,17 @@ async def _stream_and_handle_tools( tool_results: list[dict] = [] for tc in llm_response.tool_calls: + # ACC: tool_call emit. Args_summary is intentionally + # truncated — the ACC vocabulary documents it as a + # summary string, not a full payload. Detectors + # don't read args today; this is reserved for a + # future `detect_orphaned_tool_call`. + self._acc_observe( + "tool_call", + {"name": tc.name, "args_summary": str(tc.input)[:120]}, + severity=1, + round_idx=tool_round, + ) if self._episodic is not None: self._episodic.log_turn( self._turn_count + 1, @@ -1481,6 +1692,28 @@ async def _stream_and_handle_tools( result_text = self._apply_error_tracking( result_text, tc.name, error_streak, resilience_nudged ) + # ACC: tool_result emit. Heuristic success-detection + # from the result text — anton-core does not have a + # structured success/error envelope at this layer, + # so we look for the conventional "Tool 'X' failed" + # prefix that the exception branch above sets, plus + # any handler that prefixed its return with "Error:" + # or the dispatcher's own error-tracking markers. + _failed = ( + f"Tool '{tc.name}' failed:" in result_text + or result_text.startswith("Error:") + or "ERROR:" in result_text[:200].upper() + ) + self._acc_observe( + "tool_result", + { + "name": tc.name, + "success": not _failed, + "error": result_text[:300] if _failed else "", + }, + severity=5 if _failed else 1, + round_idx=tool_round, + ) tool_results.append( { "type": "tool_result", @@ -1489,6 +1722,12 @@ async def _stream_and_handle_tools( } ) + # ACC Layer 2 — mid-turn nudge. No-op when mode != "active" + # or when no new patterns fired this round. When it does + # fire, the lesson text appears inline alongside tool_results + # so the LLM sees the alarm before its next decision. + self._acc_maybe_nudge(tool_results) + self._append_history({"role": "user", "content": tool_results}) # Signal that tools are done and LLM is now reasoning diff --git a/anton/core/tools/tool_handlers.py b/anton/core/tools/tool_handlers.py index 8ba69465..f607307c 100644 --- a/anton/core/tools/tool_handlers.py +++ b/anton/core/tools/tool_handlers.py @@ -294,12 +294,34 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str: if not name: return "Scratchpad name is required." + # ACC emit helper: use the session's safe wrapper if it exists, + # otherwise no-op. Defined as a local closure so each emit site + # stays a single line. + def _acc_observe(kind: str, detail: dict, *, severity: int = 1) -> None: + fn = getattr(session, "_acc_observe", None) + if fn is not None: + fn(kind, detail, severity=severity) + if action == "exec": result = await prepare_scratchpad_exec(session, tc_input) if isinstance(result, str): + # Empty / malformed code parameter — the dispatcher rejected + # it before reaching the runtime. This is exactly the + # "silent code-clip" failure mode the ACC's + # detect_oversized_cell watches for. + _acc_observe("scratchpad_empty_code", {"name": name}, severity=7) return result pad, code, description, estimated_time, estimated_seconds = result + _acc_observe( + "scratchpad_call", + { + "name": name, + "code_len": len(code or ""), + "one_line_description": description or "", + }, + ) + # Notify pre-execute observers (e.g. cerebellum). The runtime # never sees these — observation is an orchestration concern, # so it lives at the dispatcher layer where the data is most @@ -325,6 +347,31 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str: pad_name=name, description=description, cell=cell, ) await _fire_post_execute(session, cell) + # ACC: distinguish "killed" (timeout/cancel/OOM) from a + # plain runtime error. The local backend sets cell.error + # to a string starting with "Cancelled" or matching the + # "Cell timed out"/"Cell killed" prefixes from the + # asyncio.TimeoutError path. Everything else (NameError, + # ImportError, …) is a regular result with success=False. + err = (cell.error or "").strip() + if err.startswith(("Cancelled", "Cell timed out", "Cell killed")): + _acc_observe( + "scratchpad_killed", + {"name": name, "reason": err[:120]}, + severity=6, + ) + else: + success = not err and not (cell.stderr or "").strip() + _acc_observe( + "scratchpad_result", + { + "name": name, + "success": success, + "stdout_len": len(cell.stdout or ""), + "error": err[:300] if err else "", + }, + severity=5 if not success else 1, + ) return format_cell_result(cell) elif action == "view": @@ -338,6 +385,11 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str: if pad is None: return f"No scratchpad named '{name}'." await pad.reset() + _acc_observe( + "scratchpad_reset", + {"name": name, "reason": "manual"}, + severity=5, + ) return f"Scratchpad '{name}' reset. All state cleared." elif action == "remove": diff --git a/tests/fixtures/acc/kill_loop.json b/tests/fixtures/acc/kill_loop.json new file mode 100644 index 00000000..28d3c7d4 --- /dev/null +++ b/tests/fixtures/acc/kill_loop.json @@ -0,0 +1,6 @@ +[ + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "compute", "code_len": 420}, "round_idx": 2}, + {"kind": "scratchpad_killed", "severity": 6, "detail": {"name": "compute", "reason": "timeout"}, "round_idx": 3}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "compute", "code_len": 420}, "round_idx": 5}, + {"kind": "scratchpad_killed", "severity": 6, "detail": {"name": "compute", "reason": "timeout"}, "round_idx": 6} +] diff --git a/tests/fixtures/acc/name_switch.json b/tests/fixtures/acc/name_switch.json new file mode 100644 index 00000000..0dee02b6 --- /dev/null +++ b/tests/fixtures/acc/name_switch.json @@ -0,0 +1,7 @@ +[ + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "build_pres", "code_len": 480}, "round_idx": 3}, + {"kind": "scratchpad_result", "severity": 1, "detail": {"name": "build_pres", "success": true, "stdout_len": 64}, "round_idx": 3}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "write_html", "code_len": 612}, "round_idx": 7}, + {"kind": "scratchpad_result", "severity": 5, "detail": {"name": "write_html", "success": false, "error": "NameError: name 'data' is not defined"}, "round_idx": 7}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "pres1", "code_len": 320}, "round_idx": 9} +] diff --git a/tests/fixtures/acc/oversized_cell.json b/tests/fixtures/acc/oversized_cell.json new file mode 100644 index 00000000..1d2d8478 --- /dev/null +++ b/tests/fixtures/acc/oversized_cell.json @@ -0,0 +1,8 @@ +[ + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "dash", "code_len": 320}, "round_idx": 2}, + {"kind": "scratchpad_result", "severity": 1, "detail": {"name": "dash", "success": true, "stdout_len": 16}, "round_idx": 2}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "dash", "code_len": 18200}, "round_idx": 4}, + {"kind": "scratchpad_empty_code", "severity": 7, "detail": {"name": "dash"}, "round_idx": 4}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "dash", "code_len": 19500}, "round_idx": 5}, + {"kind": "scratchpad_empty_code", "severity": 7, "detail": {"name": "dash"}, "round_idx": 5} +] diff --git a/tests/fixtures/acc/publish_failure_loop.json b/tests/fixtures/acc/publish_failure_loop.json new file mode 100644 index 00000000..a68f5129 --- /dev/null +++ b/tests/fixtures/acc/publish_failure_loop.json @@ -0,0 +1,8 @@ +[ + {"kind": "tool_call", "severity": 1, "detail": {"name": "publish_or_preview", "args_summary": "action=publish"}, "round_idx": 6}, + {"kind": "tool_result", "severity": 5, "detail": {"name": "publish_or_preview", "success": false, "error": "PUBLISH FAILED: settings module unavailable"}, "round_idx": 6}, + {"kind": "tool_call", "severity": 1, "detail": {"name": "publish_or_preview", "args_summary": "action=publish"}, "round_idx": 8}, + {"kind": "tool_result", "severity": 5, "detail": {"name": "publish_or_preview", "success": false, "error": "PUBLISH FAILED: settings module unavailable"}, "round_idx": 8}, + {"kind": "tool_call", "severity": 1, "detail": {"name": "publish_or_preview", "args_summary": "action=publish"}, "round_idx": 10}, + {"kind": "tool_result", "severity": 6, "detail": {"name": "publish_or_preview", "success": false, "error": "PUBLISH FAILED: settings module unavailable"}, "round_idx": 10} +] diff --git a/tests/fixtures/acc/reset_churn.json b/tests/fixtures/acc/reset_churn.json new file mode 100644 index 00000000..796272a0 --- /dev/null +++ b/tests/fixtures/acc/reset_churn.json @@ -0,0 +1,8 @@ +[ + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "dash", "code_len": 320}, "round_idx": 2}, + {"kind": "scratchpad_result","severity": 5, "detail": {"name": "dash", "success": false, "error": "ValueError: shape mismatch"}, "round_idx": 2}, + {"kind": "scratchpad_reset", "severity": 5, "detail": {"name": "dash", "reason": "manual_clear"}, "round_idx": 4}, + {"kind": "scratchpad_call", "severity": 1, "detail": {"name": "dash", "code_len": 280}, "round_idx": 5}, + {"kind": "scratchpad_result","severity": 5, "detail": {"name": "dash", "success": false, "error": "ImportError: pandas"}, "round_idx": 5}, + {"kind": "scratchpad_reset", "severity": 5, "detail": {"name": "dash", "reason": "manual_clear"}, "round_idx": 7} +] diff --git a/tests/test_acc.py b/tests/test_acc.py new file mode 100644 index 00000000..2ed7f114 --- /dev/null +++ b/tests/test_acc.py @@ -0,0 +1,639 @@ +"""Tests for `anton.core.memory.acc` — the ACC error-detection module. + +Layout mirrors the design contract documented in `acc.py`: + + * Layer 1 — pure-function detector tests. Each `detect_*` is + exercised with one positive case (the pattern fires) and one + negative case (the pattern does not fire). No state, no I/O. + + * Layer 2 — `AnteriorCingulate` unit tests. State management + (`observe`, `clear`), vocabulary discipline (unknown kinds + raise), and the dedupe seam (`has_similar_lesson`). + + * Layer 3 — replay tests. Real captured event traces from + antontron sessions live as JSON in `tests/fixtures/acc/`. Each + fixture asserts the exact lesson the ACC should produce. This + layer is the living regression suite — every new failure mode + Anton encounters in the wild can be captured here once and + locked in forever after. + + * Layer 4 — vocabulary sanity. A single test that walks every + `kind` in `EVENT_KINDS` and asserts at least one detector + reads it. Prevents the event taxonomy from drifting away from + the detectors that consume it. + +No LLM calls in any test. Detectors are pure; dedupe is stubbed. +""" + +from __future__ import annotations + +import inspect +import json +from pathlib import Path + +import pytest + +from anton.core.memory.acc import ( + DETECTORS, + EVENT_KINDS, + AnteriorCingulate, + Event, + Lesson, + detect_cap_exhausted, + detect_kill_loop, + detect_name_switch, + detect_oversized_cell, + detect_repair_churn, + detect_repeated_error_signature, + detect_repeated_tool_error, + detect_reset_churn, + detect_severity_climb, +) + + +FIXTURES = Path(__file__).parent / "fixtures" / "acc" + + +def _load_fixture(name: str) -> list[Event]: + """Load a fixture JSON and return Event objects.""" + data = json.loads((FIXTURES / name).read_text(encoding="utf-8")) + return [ + Event( + kind=item["kind"], + severity=int(item.get("severity", 1)), + detail=dict(item.get("detail", {})), + round_idx=int(item.get("round_idx", 0)), + ) + for item in data + ] + + +# ───────────────────────────────────────────────────────────────────────────── +# Layer 1 — pure-function detector tests +# ───────────────────────────────────────────────────────────────────────────── + + +class TestDetectNameSwitch: + def test_fires_on_two_distinct_names(self): + events = [ + Event("scratchpad_call", 1, {"name": "build_pres"}, 3), + Event("scratchpad_call", 1, {"name": "write_html"}, 7), + ] + lesson = detect_name_switch(events) + assert lesson is not None + assert lesson.detector == "detect_name_switch" + # The phrase the LLM is supposed to internalise. + assert "ONE scratchpad" in lesson.rule + + def test_silent_on_single_name(self): + events = [ + Event("scratchpad_call", 1, {"name": "dash"}, i) for i in range(5) + ] + assert detect_name_switch(events) is None + + def test_silent_on_empty_event_list(self): + assert detect_name_switch([]) is None + + def test_ignores_other_event_kinds(self): + # tool_call has no `name` field for *scratchpad*; the detector + # should not be tricked into firing on tool names. + events = [ + Event("tool_call", 1, {"name": "scratchpad", "args_summary": "..."}, 1), + Event("tool_call", 1, {"name": "publish_or_preview", "args_summary": "..."}, 2), + ] + assert detect_name_switch(events) is None + + +class TestDetectOversizedCell: + def test_fires_on_observed_empty_code(self): + # One big cell plus the empty-code failure mode IS the smoking + # gun. Should fire even though only one big cell is present. + events = [ + Event("scratchpad_call", 1, {"name": "dash", "code_len": 18000}, 4), + Event("scratchpad_empty_code", 7, {"name": "dash"}, 4), + ] + lesson = detect_oversized_cell(events) + assert lesson is not None + assert "5 KB" in lesson.rule or "5 KB" in lesson.rule.replace(" ", " ") + + def test_fires_on_two_or_more_big_cells_even_without_empty_code(self): + events = [ + Event("scratchpad_call", 1, {"name": "dash", "code_len": 9000}, 2), + Event("scratchpad_call", 1, {"name": "dash", "code_len": 11000}, 4), + ] + assert detect_oversized_cell(events) is not None + + def test_silent_on_single_big_cell(self): + # One isolated big cell isn't pattern enough. Detector only + # fires when the pattern is repeated OR the failure mode + # actually manifested. + events = [Event("scratchpad_call", 1, {"name": "dash", "code_len": 10000}, 2)] + assert detect_oversized_cell(events) is None + + def test_silent_when_all_cells_are_small(self): + events = [ + Event("scratchpad_call", 1, {"name": "dash", "code_len": n}, i) + for i, n in enumerate([200, 500, 1200, 800]) + ] + assert detect_oversized_cell(events) is None + + +class TestDetectRepeatedToolError: + def test_fires_on_two_consecutive_failures_of_same_tool(self): + events = [ + Event("tool_result", 1, {"name": "publish_or_preview", "success": False, "error": "x"}, 6), + Event("tool_result", 1, {"name": "publish_or_preview", "success": False, "error": "x"}, 8), + ] + lesson = detect_repeated_tool_error(events) + assert lesson is not None + assert "don't retry" in lesson.rule.lower() + + def test_silent_when_failures_are_for_different_tools(self): + events = [ + Event("tool_result", 1, {"name": "publish_or_preview", "success": False}, 6), + Event("tool_result", 1, {"name": "lookup_connector", "success": False}, 8), + ] + assert detect_repeated_tool_error(events) is None + + def test_success_resets_the_run(self): + events = [ + Event("tool_result", 1, {"name": "publish_or_preview", "success": False}, 6), + Event("tool_result", 1, {"name": "publish_or_preview", "success": True}, 8), + Event("tool_result", 1, {"name": "publish_or_preview", "success": False}, 10), + ] + # Longest consecutive failure run is 1 → does not fire. + assert detect_repeated_tool_error(events) is None + + def test_silent_on_single_failure(self): + events = [ + Event("tool_result", 1, {"name": "publish_or_preview", "success": False}, 6), + ] + assert detect_repeated_tool_error(events) is None + + +class TestDetectRepeatedErrorSignature: + def test_fires_on_same_error_three_times_across_tools(self): + # The unifier — same normalised signature across DIFFERENT tools + # should fire, where detect_repeated_tool_error would not. + events = [ + Event("tool_result", 5, {"name": "a", "success": False, "error": "Refusing to save record for engine='gmail-1'"}, 3), + Event("tool_result", 5, {"name": "b", "success": False, "error": "Refusing to save record for engine='gmail-2'"}, 4), + Event("tool_result", 5, {"name": "c", "success": False, "error": "Refusing to save record for engine='gmail-3'"}, 5), + ] + lesson = detect_repeated_error_signature(events) + assert lesson is not None + assert lesson.detector == "detect_repeated_error_signature" + assert "same error message" in lesson.rule.lower() + + def test_silent_on_two_same_errors(self): + # Threshold is 3 — two same-errors is one legitimate retry. + events = [ + Event("tool_result", 5, {"name": "x", "success": False, "error": "boom"}, 1), + Event("tool_result", 5, {"name": "x", "success": False, "error": "boom"}, 2), + ] + assert detect_repeated_error_signature(events) is None + + def test_silent_when_errors_are_distinct(self): + events = [ + Event("tool_result", 5, {"name": "x", "success": False, "error": "auth failed"}, 1), + Event("tool_result", 5, {"name": "x", "success": False, "error": "rate limited"}, 2), + Event("tool_result", 5, {"name": "x", "success": False, "error": "schema invalid"}, 3), + ] + assert detect_repeated_error_signature(events) is None + + def test_reads_scratchpad_results_too(self): + events = [ + Event("scratchpad_result", 5, {"name": "p", "success": False, "error": "NameError: 'data' undefined"}, 1), + Event("scratchpad_result", 5, {"name": "p", "success": False, "error": "NameError: 'data' undefined"}, 2), + Event("scratchpad_result", 5, {"name": "p", "success": False, "error": "NameError: 'data' undefined"}, 3), + ] + assert detect_repeated_error_signature(events) is not None + + def test_success_events_dont_count(self): + events = [ + Event("tool_result", 1, {"name": "x", "success": True}, 1), + Event("tool_result", 1, {"name": "x", "success": True}, 2), + Event("tool_result", 1, {"name": "x", "success": True}, 3), + ] + assert detect_repeated_error_signature(events) is None + + +class TestDetectResetChurn: + def test_fires_on_two_resets(self): + events = [ + Event("scratchpad_reset", 5, {"name": "dash", "reason": "manual"}, 4), + Event("scratchpad_reset", 5, {"name": "dash", "reason": "manual"}, 8), + ] + lesson = detect_reset_churn(events) + assert lesson is not None + assert lesson.detector == "detect_reset_churn" + assert "reset" in lesson.rule.lower() + + def test_silent_on_single_reset(self): + events = [Event("scratchpad_reset", 5, {"name": "dash"}, 4)] + assert detect_reset_churn(events) is None + + +class TestDetectKillLoop: + def test_fires_on_two_kills_same_name(self): + events = [ + Event("scratchpad_killed", 6, {"name": "compute", "reason": "timeout"}, 3), + Event("scratchpad_killed", 6, {"name": "compute", "reason": "timeout"}, 6), + ] + lesson = detect_kill_loop(events) + assert lesson is not None + assert lesson.detector == "detect_kill_loop" + + def test_silent_when_kills_are_for_different_names(self): + events = [ + Event("scratchpad_killed", 6, {"name": "a", "reason": "timeout"}, 1), + Event("scratchpad_killed", 6, {"name": "b", "reason": "timeout"}, 2), + ] + assert detect_kill_loop(events) is None + + def test_silent_on_single_kill(self): + events = [Event("scratchpad_killed", 6, {"name": "compute"}, 3)] + assert detect_kill_loop(events) is None + + +class TestDetectSeverityClimb: + def test_fires_on_strictly_increasing_run_to_high_severity(self): + events = [ + Event("tool_result", 1, {"name": "publish", "success": True}, 1), + Event("tool_result", 3, {"name": "publish", "success": False, "error": "minor"}, 2), + Event("tool_result", 5, {"name": "publish", "success": False, "error": "worse"}, 3), + Event("tool_result", 7, {"name": "publish", "success": False, "error": "very bad"}, 4), + ] + lesson = detect_severity_climb(events) + assert lesson is not None + assert lesson.detector == "detect_severity_climb" + + def test_silent_on_flat_severities(self): + events = [ + Event("tool_result", 5, {"name": "x", "success": False}, 1), + Event("tool_result", 5, {"name": "x", "success": False}, 2), + Event("tool_result", 5, {"name": "x", "success": False}, 3), + ] + assert detect_severity_climb(events) is None + + def test_silent_when_peak_below_threshold(self): + # Strictly increasing but never reaches _SEVERITY_CLIMB_PEAK. + events = [ + Event("tool_result", 1, {"name": "x", "success": True}, 1), + Event("tool_result", 2, {"name": "x", "success": True}, 2), + Event("tool_result", 3, {"name": "x", "success": True}, 3), + ] + assert detect_severity_climb(events) is None + + def test_silent_when_climb_is_split_across_producers(self): + # Climbing-severity events for DIFFERENT names → not a single + # producer's deteriorating sequence. + events = [ + Event("tool_result", 1, {"name": "a", "success": True}, 1), + Event("tool_result", 5, {"name": "b", "success": False, "error": "x"}, 2), + Event("tool_result", 7, {"name": "c", "success": False, "error": "y"}, 3), + ] + assert detect_severity_climb(events) is None + + +class TestDetectRepairChurn: + def test_fires_on_three_repairs(self): + events = [ + Event("history_repair", 5, {"reason": "orphan_tool_use"}, 2), + Event("history_repair", 5, {"reason": "orphan_tool_use"}, 4), + Event("history_repair", 5, {"reason": "orphan_tool_use"}, 7), + ] + lesson = detect_repair_churn(events) + assert lesson is not None + assert lesson.detector == "detect_repair_churn" + + def test_silent_on_two_repairs(self): + events = [ + Event("history_repair", 5, {"reason": "x"}, 2), + Event("history_repair", 5, {"reason": "x"}, 4), + ] + assert detect_repair_churn(events) is None + + +class TestDetectCapExhausted: + def test_fires_on_single_occurrence(self): + events = [Event("cap_exhausted", 9, {}, 25)] + lesson = detect_cap_exhausted(events) + assert lesson is not None + assert lesson.detector == "detect_cap_exhausted" + assert "round cap" in lesson.rule.lower() + + def test_silent_when_absent(self): + events = [Event("tool_result", 1, {"name": "x", "success": True}, 1)] + assert detect_cap_exhausted(events) is None + + +# ───────────────────────────────────────────────────────────────────────────── +# Layer 2 — AnteriorCingulate state tests +# ───────────────────────────────────────────────────────────────────────────── + + +class TestAnteriorCingulate: + def test_observe_appends_to_buffer(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "x", "code_len": 100}) + assert len(acc.events) == 1 + assert acc.events[0].kind == "scratchpad_call" + + def test_observe_rejects_unknown_kind(self): + acc = AnteriorCingulate() + with pytest.raises(ValueError, match="Unknown ACC event kind"): + acc.observe("scratchpad_explosion", {}) + + def test_clear_drops_buffer(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "x", "code_len": 100}) + acc.clear() + assert len(acc.events) == 0 + + def test_at_end_of_turn_returns_lessons_in_detector_order(self): + acc = AnteriorCingulate() + # Trigger both name_switch and repeated_tool_error. + acc.observe("scratchpad_call", {"name": "a", "code_len": 200}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 200}, round_idx=2) + acc.observe("tool_result", {"name": "t", "success": False}, round_idx=3) + acc.observe("tool_result", {"name": "t", "success": False}, round_idx=4) + lessons = acc.at_end_of_turn() + # DETECTORS order: name_switch, oversized_cell (silent here), + # repeated_tool_error. So name_switch first, then tool_error. + assert [l.detector for l in lessons] == [ + "detect_name_switch", + "detect_repeated_tool_error", + ] + + def test_has_similar_lesson_blocks_persistence(self): + # Stub: pretend memory already knows the name-switch rule. + def already_known(rule: str) -> bool: + return "ONE scratchpad" in rule + + acc = AnteriorCingulate(has_similar_lesson=already_known) + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + lessons = acc.at_end_of_turn() + assert lessons == [] # de-duped against existing memory + + def test_cross_detector_dedupe(self): + # Belt-and-suspenders: if two detectors ever produce the same + # rule string, we don't write it twice in one turn. + called = {"count": 0} + + def fake_detector(events): + called["count"] += 1 + return Lesson(rule="duplicate rule", kind="when", triggers=(), detector=f"fake_{called['count']}") + + acc = AnteriorCingulate(detectors=(fake_detector, fake_detector)) + acc.observe("scratchpad_call", {"name": "x", "code_len": 100}) + lessons = acc.at_end_of_turn() + assert len(lessons) == 1 + assert lessons[0].rule == "duplicate rule" + + def test_event_kind_counts(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}) + acc.observe("tool_result", {"name": "t", "success": False}) + assert acc.event_kind_counts == {"scratchpad_call": 2, "tool_result": 1} + + +class TestAtRoundN: + """Layer 2 — mid-turn nudging contract. + + Each detector is allowed to nudge AT MOST ONCE per turn. + Subsequent calls during the same turn only return lessons from + detectors that haven't nudged yet. `clear()` resets the nudge + tracking so the next turn starts fresh. + """ + + def test_returns_lessons_for_newly_fired_detectors(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + lessons = acc.at_round_n() + assert len(lessons) == 1 + assert lessons[0].detector == "detect_name_switch" + + def test_second_call_same_turn_returns_empty_when_no_new_pattern(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + first = acc.at_round_n() + second = acc.at_round_n() + assert len(first) == 1 + assert second == [] + + def test_second_call_returns_new_pattern_when_one_emerges(self): + # Round 1: name_switch fires. Round 2: tool-error pattern emerges. + # Second call should return ONLY the tool-error lesson, not the + # already-nudged name-switch lesson. + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + first = acc.at_round_n() + assert [l.detector for l in first] == ["detect_name_switch"] + + acc.observe("tool_result", {"name": "t", "success": False}, round_idx=3) + acc.observe("tool_result", {"name": "t", "success": False}, round_idx=4) + second = acc.at_round_n() + assert [l.detector for l in second] == ["detect_repeated_tool_error"] + + def test_ignores_has_similar_lesson(self): + """Mid-turn nudges should fire even when the rule already + lives in memory. Skipping based on memory dedupe would mean + the LLM gets no in-context reminder of a rule it's actively + violating right now.""" + def always_known(_rule): + return True + + acc = AnteriorCingulate(has_similar_lesson=always_known) + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + assert acc.at_round_n() != [] # fires even though rule "is in memory" + + def test_clear_resets_nudge_tracking(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + first = acc.at_round_n() + assert len(first) == 1 + # Simulate a turn boundary. + acc.clear() + acc.observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + acc.observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + again = acc.at_round_n() + # Fresh turn: same pattern fires again. + assert len(again) == 1 + + def test_silent_when_no_patterns(self): + acc = AnteriorCingulate() + acc.observe("scratchpad_call", {"name": "solo", "code_len": 50}, round_idx=1) + assert acc.at_round_n() == [] + + +# ───────────────────────────────────────────────────────────────────────────── +# Layer 3 — replay tests +# +# Each fixture is a real captured failure mode. The assertions lock +# in the exact lesson ACC should produce. To add a new fixture: +# +# 1. Drop a JSON file in tests/fixtures/acc/. +# 2. Add a method below. +# 3. If no existing detector catches it, write one in acc.py and +# add unit-test coverage in Layer 1. +# ───────────────────────────────────────────────────────────────────────────── + + +class TestReplay: + def _run(self, fixture: str) -> list[Lesson]: + acc = AnteriorCingulate() + for e in _load_fixture(fixture): + acc.observe(e.kind, e.detail, severity=e.severity, round_idx=e.round_idx) + return acc.at_end_of_turn() + + def test_replay_name_switch(self): + lessons = self._run("name_switch.json") + rules = {l.rule for l in lessons} + assert any("ONE scratchpad" in r for r in rules), ( + f"name_switch fixture should produce a name-switch lesson, got: {rules}" + ) + + def test_replay_oversized_cell(self): + lessons = self._run("oversized_cell.json") + rules = {l.rule for l in lessons} + assert any("5 KB" in r for r in rules), ( + f"oversized_cell fixture should produce the cell-size lesson, got: {rules}" + ) + + def test_replay_publish_failure_loop(self): + lessons = self._run("publish_failure_loop.json") + rules = {l.rule for l in lessons} + # The fixture has three identical failures of the same tool with + # the same error message — it should fire BOTH the tool-level + # retry detector AND the broader error-signature detector. + assert any("don't retry" in r.lower() for r in rules), ( + f"publish_failure_loop fixture should produce the retry-loop lesson, got: {rules}" + ) + assert any("same error message" in r.lower() for r in rules), ( + f"publish_failure_loop fixture should also produce the repeated-signature lesson, got: {rules}" + ) + + def test_replay_reset_churn(self): + lessons = self._run("reset_churn.json") + rules = {l.rule for l in lessons} + assert any("reset" in r.lower() and "scratchpad" in r.lower() for r in rules), ( + f"reset_churn fixture should produce the state-abandonment lesson, got: {rules}" + ) + + def test_replay_kill_loop(self): + lessons = self._run("kill_loop.json") + rules = {l.rule for l in lessons} + assert any("killed" in r.lower() for r in rules), ( + f"kill_loop fixture should produce the kill-loop lesson, got: {rules}" + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Layer 4 — vocabulary discipline +# ───────────────────────────────────────────────────────────────────────────── + + +def test_every_event_kind_is_read_by_at_least_one_detector(): + """Every value in EVENT_KINDS should appear as a string literal in + at least one detector's source. Drift between the vocabulary and + the detectors is the rot we're guarding against — when a new kind + is added without a detector that consumes it, this test fails + loudly and the contributor either adds a detector or deletes the + kind from the vocabulary. + + `KNOWN_PRODUCER_ONLY` is the tight, justified allowlist for kinds + that are emitted today but not yet consumed by a detector. Every + entry needs an explicit reason. Adding to this set is a smell — + the bar for a new entry is "we genuinely need this emitted for + correlation/telemetry today AND a detector will land within a + sprint." Otherwise, drop the kind from EVENT_KINDS. + """ + KNOWN_PRODUCER_ONLY = { + # tool_call carries `args_summary`, paired with `tool_result` + # by round_idx. No current detector reads it (the actionable + # signal is the result, not the call). Reserved for a future + # `detect_orphaned_tool_call` (call with no matching result = + # transport dropped the response). Keeping the emit point is + # cheap; adding it later would require re-instrumenting every + # tool dispatch site. + "tool_call", + } + detector_sources = "\n".join(inspect.getsource(d) for d in DETECTORS) + missing = [] + for kind in sorted(EVENT_KINDS): + if kind in KNOWN_PRODUCER_ONLY: + continue + if f'"{kind}"' not in detector_sources: + missing.append(kind) + assert not missing, ( + f"Event kinds in EVENT_KINDS but not read by any detector: {missing}. " + f"Either add a detector that reads them or remove them from EVENT_KINDS. " + f"Producer-only kinds require an entry in KNOWN_PRODUCER_ONLY with a reason." + ) + + +def test_every_lesson_has_a_valid_kind(): + """Every detector must tag its Lesson with a `kind` so the wiring + layer can route it to the right Engram kind (always/never/when) + without parsing the rule text. Detectors that fire here drive the + smoke check; detectors that don't fire in this synthetic mix are + indirectly covered by their own positive unit tests.""" + # Craft an event stream that fires every detector. Some need + # specific shapes — these were copied from the per-detector tests. + events = [ + # name_switch + oversized_cell + Event("scratchpad_call", 1, {"name": "a", "code_len": 9000}, 1), + Event("scratchpad_call", 1, {"name": "b", "code_len": 11000}, 2), + # tool repeats + Event("tool_result", 5, {"name": "t", "success": False, "error": "same err"}, 3), + Event("tool_result", 5, {"name": "t", "success": False, "error": "same err"}, 4), + Event("tool_result", 5, {"name": "t", "success": False, "error": "same err"}, 5), + # reset + kill + Event("scratchpad_reset", 5, {"name": "a"}, 6), + Event("scratchpad_reset", 5, {"name": "a"}, 7), + Event("scratchpad_killed", 6, {"name": "c"}, 8), + Event("scratchpad_killed", 6, {"name": "c"}, 9), + # severity climb on producer "z" + Event("tool_result", 1, {"name": "z", "success": True}, 10), + Event("tool_result", 3, {"name": "z", "success": False, "error": "small"}, 11), + Event("tool_result", 7, {"name": "z", "success": False, "error": "big"}, 12), + # repair churn + Event("history_repair", 5, {"reason": "x"}, 13), + Event("history_repair", 5, {"reason": "x"}, 14), + Event("history_repair", 5, {"reason": "x"}, 15), + # cap exhausted + Event("cap_exhausted", 9, {}, 25), + ] + acc = AnteriorCingulate() + for e in events: + acc.observe(e.kind, e.detail, severity=e.severity, round_idx=e.round_idx) + lessons = acc.at_end_of_turn() + assert lessons, "Crafted event stream should fire multiple detectors" + for l in lessons: + assert l.kind in ("always", "never", "when"), ( + f"Detector {l.detector} produced invalid kind {l.kind!r}; " + f"must be one of always/never/when so Cortex.encode() routes correctly." + ) + + +def test_no_dropped_kinds_lingering_in_event_kinds(): + """Guard against zombies — kinds we explicitly dropped should not + reappear in EVENT_KINDS. If a future reviewer adds them back, this + test forces a conversation about *why* (and probably about adding + a real detector this time). + """ + DROPPED = {"context_compaction", "round_milestone"} + leaked = DROPPED & EVENT_KINDS + assert not leaked, ( + f"These kinds were intentionally dropped from the ACC vocabulary " + f"because no detector consumed them: {leaked}. If you need to " + f"re-add one, add a detector that reads it in the same change." + ) diff --git a/tests/test_session_acc_init.py b/tests/test_session_acc_init.py new file mode 100644 index 00000000..1a0f6732 --- /dev/null +++ b/tests/test_session_acc_init.py @@ -0,0 +1,280 @@ +"""Wiring tests for the ACC integration in ChatSession. + +These tests don't construct a full ChatSession (it requires a live LLM +client and many other dependencies). Instead they verify the contract +points where ChatSession and the ACC meet: + + - `_acc_observe` is a safe-emit wrapper (silent on unknown kinds, + silent when the cortex is disabled). + - `_schedule_acc_flush` drains the ACC buffer, converts each Lesson + into an Engram with the kind the detector tagged, and routes them + through `cortex.encode()`. + +We exercise the methods by binding them onto a `SimpleNamespace` with +the minimum attributes they read. This is the same approach +test_session_skills_init.py uses for the skill-store contract — keeps +the tests fast and pinned to the *seam* under test instead of dragging +in workspaces, LLM clients, and storage backends. +""" + +from __future__ import annotations + +import asyncio +import types +from types import SimpleNamespace + +import pytest + +from anton.core.memory.acc import AnteriorCingulate +from anton.core.memory.base import Engram +from anton.core.session import ChatSession + + +class FakeCortex: + """Minimal stand-in for Cortex used by _schedule_acc_flush. + + Records every batch of engrams passed to `encode()` so the test can + inspect exactly what would land in long-term memory. + """ + + def __init__(self, mode: str = "autopilot"): + self.mode = mode + self.encoded: list[list[Engram]] = [] + self.global_hc = SimpleNamespace(recall_rules=lambda: "") + + async def encode(self, engrams: list[Engram]) -> list[str]: + self.encoded.append(list(engrams)) + return [f"encoded:{e.kind}" for e in engrams] + + +def _make_session_stub(cortex: FakeCortex, *, acc_mode: str = "passive") -> SimpleNamespace: + """Build the smallest object that satisfies _acc_observe, + _schedule_acc_flush, and _acc_maybe_nudge. Binds the real + ChatSession methods onto it so the test exercises production + code, not a re-implementation.""" + stub = SimpleNamespace( + _cortex=cortex, + _acc=AnteriorCingulate(), # default predicate accepts everything + _acc_mode=acc_mode, + ) + stub._acc_observe = types.MethodType(ChatSession._acc_observe, stub) + stub._schedule_acc_flush = types.MethodType(ChatSession._schedule_acc_flush, stub) + stub._acc_maybe_nudge = types.MethodType(ChatSession._acc_maybe_nudge, stub) + return stub + + +class TestAccObserveWrapper: + def test_silent_when_cortex_disabled(self): + cortex = FakeCortex(mode="off") + s = _make_session_stub(cortex) + s._acc_observe("scratchpad_call", {"name": "x", "code_len": 10}) + # Mode=="off" → emit suppressed entirely so we don't accumulate + # events the flush will throw away anyway. + assert s._acc.events == () + + def test_silent_on_unknown_kind(self): + cortex = FakeCortex() + s = _make_session_stub(cortex) + # No exception even though "made_up_kind" isn't in EVENT_KINDS. + # Emit-site drift must never break a turn. + s._acc_observe("made_up_kind", {}) + assert s._acc.events == () + + def test_records_known_kind(self): + cortex = FakeCortex() + s = _make_session_stub(cortex) + s._acc_observe("scratchpad_call", {"name": "x", "code_len": 10}, round_idx=2) + assert len(s._acc.events) == 1 + assert s._acc.events[0].kind == "scratchpad_call" + assert s._acc.events[0].round_idx == 2 + + +class TestScheduleAccFlush: + @pytest.mark.asyncio + async def test_drains_fires_and_encodes_engrams(self): + cortex = FakeCortex() + s = _make_session_stub(cortex) + # Pump events that fire detect_name_switch (always) and + # detect_repeated_tool_error (when). Two different Lesson + # kinds — verifies the wiring preserves the detector's tag. + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + s._acc_observe( + "tool_result", + {"name": "t", "success": False, "error": "boom"}, + severity=5, + round_idx=3, + ) + s._acc_observe( + "tool_result", + {"name": "t", "success": False, "error": "boom"}, + severity=5, + round_idx=4, + ) + + s._schedule_acc_flush() + # Give the create_task'd coroutine a turn to run. + await asyncio.sleep(0) + await asyncio.sleep(0) + + # Buffer cleared. + assert s._acc.events == () + + # Exactly one encode batch. + assert len(cortex.encoded) == 1 + engrams = cortex.encoded[0] + kinds = {e.kind for e in engrams} + # name_switch lands as "always", repeated_tool_error as "when". + assert "always" in kinds + assert "when" in kinds + # All engrams should be global-scope, high-confidence, sourced + # as consolidation — same envelope cerebellum uses. + for e in engrams: + assert e.scope == "global" + assert e.confidence == "high" + assert e.source == "consolidation" + + @pytest.mark.asyncio + async def test_noop_when_no_lessons_fire(self): + cortex = FakeCortex() + s = _make_session_stub(cortex) + # One scratchpad_call → name_switch needs ≥2 names, no other + # detector fires on a single benign event. + s._acc_observe("scratchpad_call", {"name": "solo", "code_len": 50}) + s._schedule_acc_flush() + await asyncio.sleep(0) + assert cortex.encoded == [] + # Buffer still cleared regardless. + assert s._acc.events == () + + @pytest.mark.asyncio + async def test_clears_without_encoding_when_cortex_off(self): + cortex = FakeCortex(mode="off") + s = _make_session_stub(cortex) + # Direct ACC.observe() bypasses the _acc_observe gate so we + # can verify the flush itself respects mode=="off". (In + # production this state can't be reached — _acc_observe would + # have refused the input — but the flush has its own guard + # for defense-in-depth and we want to exercise it.) + s._acc.observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc.observe("scratchpad_call", {"name": "b", "code_len": 100}) + s._schedule_acc_flush() + await asyncio.sleep(0) + assert cortex.encoded == [] + assert s._acc.events == () + + def test_acc_observe_silent_in_off_mode(self): + # In off mode the ACC observes nothing — events drop on the + # floor at the safe-emit wrapper. Cheaper than passive when + # the user wants the feature entirely disabled. + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="off") + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}) + assert s._acc.events == () + + def test_clears_buffer_when_no_event_loop(self): + # Synchronous context — asyncio.create_task() inside the flush + # will raise RuntimeError. The flush must catch that and still + # clear the buffer rather than leak events into the next turn. + cortex = FakeCortex() + s = _make_session_stub(cortex) + s._acc.observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc.observe("scratchpad_call", {"name": "b", "code_len": 100}) + s._schedule_acc_flush() + assert s._acc.events == () + # And no encode happened because we never got the loop. + assert cortex.encoded == [] + + +class TestAccMaybeNudge: + """Layer 2 — mid-turn nudge wiring contract.""" + + def test_passive_mode_appends_nothing(self): + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="passive") + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}) + tool_results: list[dict] = [ + {"type": "tool_result", "tool_use_id": "x", "content": "ok"}, + ] + n = s._acc_maybe_nudge(tool_results) + # Passive mode: the nudge is a no-op even though a pattern fired. + # End-of-turn drain still writes the lesson to memory; mid-turn + # injection is skipped to keep the turn loop unchanged. + assert n == 0 + assert len(tool_results) == 1 + + def test_active_mode_appends_text_block(self): + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="active") + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}) + tool_results: list[dict] = [ + {"type": "tool_result", "tool_use_id": "x", "content": "ok"}, + ] + n = s._acc_maybe_nudge(tool_results) + assert n == 1 + # Original tool_result preserved; nudge appended after it. + assert tool_results[0]["type"] == "tool_result" + assert tool_results[1]["type"] == "text" + assert "Anton self-check" in tool_results[1]["text"] + assert "detect_name_switch" in tool_results[1]["text"] + + def test_active_mode_one_nudge_per_detector_per_turn(self): + # Same pattern firing on two consecutive rounds should produce + # ONE nudge total (not one per round). The dlPFC has been told; + # re-stating the same alarm round after round would just spam + # the history. + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="active") + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}) + tr1: list[dict] = [] + n1 = s._acc_maybe_nudge(tr1) + assert n1 == 1 + + # Round 2 — more events of the same kind, no new pattern. + s._acc_observe("scratchpad_call", {"name": "c", "code_len": 100}) + tr2: list[dict] = [] + n2 = s._acc_maybe_nudge(tr2) + assert n2 == 0 + + def test_active_mode_silent_when_no_new_pattern(self): + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="active") + s._acc_observe("scratchpad_call", {"name": "solo", "code_len": 50}) + tool_results: list[dict] = [] + n = s._acc_maybe_nudge(tool_results) + assert n == 0 + assert tool_results == [] + + def test_off_mode_skips_nudge_entirely(self): + # Off mode short-circuits the safe-emit wrapper, so the ACC + # never sees the events in the first place. Even if a detector + # had something to say, there's nothing in the buffer to read. + cortex = FakeCortex() + s = _make_session_stub(cortex, acc_mode="off") + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}) + n = s._acc_maybe_nudge([]) + assert n == 0 + assert s._acc.events == () + + def test_detector_exception_does_not_crash_turn(self): + # Wire a broken detector and verify the nudge wrapper swallows + # the exception. Layer 1's end-of-turn flush still runs + # independently — we don't want a buggy detector blocking the + # whole turn loop. + cortex = FakeCortex() + stub = SimpleNamespace( + _cortex=cortex, + _acc_mode="active", + ) + def boom(_events): + raise RuntimeError("detector bug") + stub._acc = AnteriorCingulate(detectors=(boom,)) + stub._acc_observe = types.MethodType(ChatSession._acc_observe, stub) + stub._acc_maybe_nudge = types.MethodType(ChatSession._acc_maybe_nudge, stub) + stub._acc_observe("scratchpad_call", {"name": "x", "code_len": 1}) + n = stub._acc_maybe_nudge([]) + assert n == 0 # graceful degradation