From e2a8bb99edf13a9bcbef3e545851d21ce0ddabc7 Mon Sep 17 00:00:00 2001 From: Jorge Torres Date: Thu, 14 May 2026 02:25:32 -0700 Subject: [PATCH] and scoring for what it learns to keep tabs so it can udnerstand what rules actually do something --- anton/README.md | 66 +++++++- anton/core/memory/cortex.py | 210 ++++++++++++++++++++---- anton/core/memory/ranker.py | 201 +++++++++++++++++++++++ anton/core/memory/rule_stats.py | 159 ++++++++++++++++++ anton/core/session.py | 27 +++ anton/memory/manage.py | 79 +++++++++ tests/test_cortex_ranker_integration.py | 191 +++++++++++++++++++++ tests/test_memory_rankings.py | 104 ++++++++++++ tests/test_ranker.py | 136 +++++++++++++++ tests/test_rule_stats.py | 155 +++++++++++++++++ tests/test_session_acc_init.py | 117 ++++++++++++- 11 files changed, 1405 insertions(+), 40 deletions(-) create mode 100644 anton/core/memory/ranker.py create mode 100644 anton/core/memory/rule_stats.py create mode 100644 tests/test_cortex_ranker_integration.py create mode 100644 tests/test_memory_rankings.py create mode 100644 tests/test_ranker.py create mode 100644 tests/test_rule_stats.py diff --git a/anton/README.md b/anton/README.md index 484e78b2..e62faa35 100644 --- a/anton/README.md +++ b/anton/README.md @@ -614,8 +614,7 @@ Modes (env var `ANTON_ACC_MODE`, mirrors `ANTON_MEMORY_MODE`): | `passive` (default) | Layer 1 only. Lessons drain to memory at end-of-turn; next turn's system prompt picks them up. Adds zero surface to the turn loop. | | `active` | Layer 1 + Layer 2. Lessons ALSO inject inline as text blocks in `tool_results` so the LLM sees them on the very next round. Stronger learning signal; more invasive. | -What is NOT yet wired (deliberate): - - **Layer 3 — retrieval-scored rule ranking.** At system-prompt assembly, score each candidate rule by relevance to the current turn's context and load the top-K within the token budget. Per-rule retrieval counters age out rules that never make the cut. Pairs with optional outcome-tracking (did the rule reduce its target pattern after it landed?). Needs a small embedding index over rules + a ranker call on the load path. + - **Layer 3 — retrieval-scored rule ranking.** Built. `Cortex.build_memory_context()` now routes `## When` rules through a BM25 ranker (`anton/core/memory/ranker.py`) scored against the current user message; only the top-K within the char budget land in the prompt. `## Always` / `## Never` rules bypass the ranker — they're unconditional by definition. Every rule that lands gets a retrieval counter bump in `rules.stats.json` (the sidecar at `anton/core/memory/rule_stats.py`). The Phase C outcome bridge wires the ACC's end-of-turn flush back into stats: when a detector fires AND its corresponding rule was loaded this turn, the rule's `ignored` counter bumps — high `ignored` is the consolidator's signal to rewrite or escalate. `/memory rankings` is the debug surface. ### Vocabulary discipline @@ -659,6 +658,67 @@ Detectors are stateless functions of `Sequence[Event] → Lesson | None`. Each d Like the cerebellum, the ACC is a *producer*. It does not own storage. Lessons it generates flow into the same Engram pipeline that the cerebellum and consolidator already use. The de-dupe predicate is caller-supplied (`has_similar_lesson`) so the wiring layer can choose substring, embedding, or semantic similarity without changing ACC internals. +## Layer 3 — Retrieval-Scored Rule Ranking + +Layers 1 and 2 produce rules; Layer 3 decides which ones to load on any given turn. The Cortex no longer dumps every `## When` rule into the system prompt — it scores each rule by relevance to the current user message and loads the top-K within budget. The mechanism is BM25 (lexical) rather than embeddings (semantic) because the corpus is tiny (<50 rules typically), the rules are 1–3 sentences, and rules + user messages share domain nouns. Microseconds per call, no LLM dependency. + +### What ranks vs. what doesn't + +| Section in `rules.md` | Treatment | Reason | +|---|---|---| +| `## Always` | Loaded in full every turn | Unconditional — ranking would defeat the point | +| `## Never` | Loaded in full every turn | Unconditional | +| `## When` | Ranked by BM25 relevance, top-K within budget | Conditional rules ARE relevance-shaped by construction | + +### Pieces + +| Module | Role | +|---|---| +| `anton/core/memory/ranker.py` | `Ranker.rank(rules, query)` → BM25-scored `RankedRule`s. `Ranker.select_within_budget(ranked, budget_tokens, floor_k, cap_k)` → final selection. No LLM call, no API key, deterministic. | +| `anton/core/memory/rule_stats.py` | `RuleStats` sidecar at `~/.anton/memory/rules.stats.json`. Buffer-and-flush write pattern — `record_retrieval` / `record_ignored` are in-memory dict updates; `flush()` does a single atomic `.tmp + os.replace` under `fcntl.flock`. One disk write per turn, not one per rule. | +| `anton/core/memory/cortex.py` | `_retrieve_relevant_rules` rewrites the `## When` section through the ranker and records retrievals. `consume_retrieved_this_turn()` exposes the per-turn rule-id set to the outcome bridge. | +| `anton/core/session.py` | `_schedule_acc_flush` now consults the per-turn retrieval set and bumps `ignored` on rules whose ACC-detected pattern fired despite being loaded. | +| `anton/memory/manage.py` | `/memory rankings` debug surface. Highlights noisy rules (high `ignored`) and cold rules (zero retrievals). | + +### Cold-start behaviour + +- No user message yet OR query has no scorable terms after stopword removal → all rules loaded in input order. Ranker only filters under budget pressure. +- Corpus under `_RULES_BUDGET_CHARS` (~6000 chars) → no ranking; full corpus loaded. +- New rule (just encoded) → starts at zero retrievals/ignored, isn't penalised at tiebreak. First retrieval creates its record. + +### Stable rule identity + +Stats key: `sha256(rule.text.strip().lower())[:16]`. Stable for the rule's lifetime — but a consolidator rewrite changes the hash and resets the counters. Acceptable v1 trade-off; v2 should attach a UUID in the rule's HTML-comment metadata so edits preserve identity. Without that, large-scale rephrasing zeroes out the very telemetry we'd use to decide which rules to keep. + +### The outcome bridge (Phase C) + +Layer 1's `_schedule_acc_flush()` already drains lessons through `cortex.encode()`. Layer 3 adds one step before encoding: + +1. Get the ACC's fired lessons via `at_end_of_turn()`. +2. Call `cortex.consume_retrieved_this_turn()` — returns the set of rule IDs that landed in this turn's prompt, and clears the set. +3. For each fired lesson, if its rule-ID is in the retrieved set, bump `rule_stats.record_ignored(rule.rule)`. The LLM saw the rule and the pattern fired anyway — that's a strong "this rule isn't sticking" signal. +4. Flush stats. Then encode the engrams as before. + +Brand-new lessons (never been retrieved because the rule was just created) correctly skip the bump — the LLM can't be ignoring a rule it hasn't seen. + +### Debug surface — `/memory rankings` + +``` +$ anton → /memory rankings + +Rule rankings (retrieval-scoring telemetry) + + RETR IGN LAST RULE + 12 0 2026-05-14 Use ONE scratchpad name per task and reuse it for every cell... + 11 2 2026-05-14 When a tool fails, don't retry with the same arguments... + 7 0 2026-05-13 When the same error message appears repeatedly in one turn... + 3 0 2026-05-12 Don't reset the scratchpad to recover from errors... + 1 0 2026-04-30 For CSV files with mixed column types, pass low_memory=False... + 0 0 — Use httpx instead of requests for HTTP calls. +``` + +Noisy rules (`IGN > 0`) render in warning color — candidates for rewriting or escalation. Cold rules (`RETR = 0`) render dim — candidates for compaction. The consolidator can later read `rules.stats.json` directly to drive automated aging-out. + ## Structured Output — `LLMClient.generate_object` Anton has a single primitive for getting structured data out of the LLM, used by the cerebellum, the consolidator, the cortex's identity/compaction passes, the connect collector, the skill drafter, and the custom-datasource flow. It lives at `anton/llm/client.py`: @@ -778,6 +838,8 @@ anton/core/memory/ LONG-TERM MEMORY (brain-mapped modules) ├── consolidator.py Consolidator class (sleep-replay → Engrams) ├── cerebellum.py Cerebellum class (per-cell supervised error learning) ├── acc.py AnteriorCingulate class (turn-level pattern error detection) +├── ranker.py BM25 ranker for retrieval-scored rule selection (Layer 3) +├── rule_stats.py Per-rule retrieval/ignored counter sidecar (Layer 3) └── skills.py Skill, SkillStore, SkillStats — procedural memory storage layer anton/memory/ LEGACY / ORTHOGONAL (not the brain-mapped memory system) diff --git a/anton/core/memory/cortex.py b/anton/core/memory/cortex.py index a3e75500..510cc45c 100644 --- a/anton/core/memory/cortex.py +++ b/anton/core/memory/cortex.py @@ -27,6 +27,8 @@ from anton.core.memory.base import HippocampusProtocol from anton.core.memory.base import Engram from anton.core.memory.hippocampus import Hippocampus +from anton.core.memory.ranker import Ranker +from anton.core.memory.rule_stats import RuleStats, rule_id if TYPE_CHECKING: from anton.core.llm.client import LLMClient @@ -129,6 +131,32 @@ def __init__( self._llm = llm_client self._turn_count = 0 + # Layer 3 — retrieval-scored rule ranking. + # Stateless BM25 ranker (no LLM call, no API key). The + # cortex re-uses it for both global and project rule paths. + self._ranker = Ranker() + # Per-rule retrieval / outcome counters. Sidecar JSON lives + # alongside global rules.md (one file across project switches — + # rules can fire either scope). Best-effort: when the + # hippocampus is a remote / protocol-only backend without a + # local `_dir`, RuleStats stays None and the cortex skips the + # counter bumps without losing the ranker behaviour itself. + global_dir = getattr(global_hc, "_dir", None) + self._rule_stats: RuleStats | None = ( + RuleStats(Path(global_dir) / "rules.stats.json") + if isinstance(global_dir, (str, Path)) + else None + ) + # Phase C — outcome bridge. Cumulative set of rule IDs that + # landed in this turn's system prompt across however many + # `build_memory_context` calls happened (a turn may rebuild + # the prompt mid-flight on retries/compaction). The ACC's + # end-of-turn flush drains this via `consume_retrieved_this_turn` + # and, for each lesson whose rule_id is in the set, bumps the + # corresponding rule's `ignored` counter — the LLM saw the + # rule and the pattern still fired. + self._retrieved_this_turn: set[str] = set() + # One-time migration: identity is singular and global. Any entries that # landed in project scope from the old encode() bug are merged upward. # Global wins on key conflicts — orphaned entries are likely stale @@ -213,28 +241,121 @@ async def build_memory_context(self, user_message: str = "") -> str: if minds_topic: sections.append(f"## Minds — Datasource Context\n{minds_topic}") + # Layer 3 — flush the buffered retrieval counters once per + # build (one disk write per turn, not one per rule). Best- + # effort: if there's no stats backing store (remote + # hippocampus, missing dir), this is a no-op. + if self._rule_stats is not None: + try: + self._rule_stats.flush() + except OSError: + # Stats are telemetry, not gating data — a failed write + # must not break system-prompt assembly. + pass + if not sections: return "" return "\n\n" + "\n\n".join(sections) - async def _retrieve_relevant_rules(self, all_rules: str, user_message: str) -> str: - """Filter rules to only those relevant to the current user message. + # Regex to strip metadata from a rule line. Module- + # scoped at the class for readability; cheap to (re)compile. + import re as _re + _METADATA_RE = _re.compile(r"", _re.DOTALL) - Brain analog: dlPFC cue-dependent recall — the prefrontal cortex - selects which memories to activate based on current goals, rather - than loading everything into working memory. + def _extract_rule_body(self, line: str) -> str: + """Pull the human-readable rule text out of a bullet line. - Always/Never rules are behavioral constraints — always loaded in full. - Only conditional (When/If) rules are filtered by relevance. - If rules are under budget or no LLM is available, returns as-is. + ``- Use httpx instead of requests `` + → + ``Use httpx instead of requests`` + + Used as both the BM25 document AND the stable-hash input for + `RuleStats`, so the metadata comments (which carry per-rule + timestamps that change on every write) don't corrupt either. """ - if not user_message or self._llm is None: + s = (line or "").strip() + if s.startswith("- "): + s = s[2:].strip() + s = self._METADATA_RE.sub("", s).strip() + return s + + def _record_retrievals_for_lines(self, lines: list[str]) -> None: + """Bump retrieval counters for every rule-bullet line that + actually carries content. Section headers / blank lines are + skipped — they aren't rules. No-op when ``_rule_stats`` is + unavailable (remote backend, etc.). + + Also populates ``self._retrieved_this_turn`` (rule-ID set) + so the Phase C outcome bridge can correlate fired lessons + against rules-that-were-actually-loaded.""" + if self._rule_stats is None: + return + for line in lines: + stripped = line.strip() + if not stripped.startswith("- "): + continue + body = self._extract_rule_body(line) + if body: + self._rule_stats.record_retrieval(body) + self._retrieved_this_turn.add(rule_id(body)) + + def consume_retrieved_this_turn(self) -> set[str]: + """Return the set of rule IDs retrieved into the system prompt + since the last call, AND clear the set. + + Take-and-clear: the consumer (typically the ACC end-of-turn + flush) reads the snapshot once per turn. Multiple consumers + would each see a different filtered view, which is rarely + what callers want — if more than one consumer needs the + signal, build a fan-out at the wiring layer instead. + + Empty set is a valid answer (cold start, no rules in memory, + or remote hippocampus where stats tracking is disabled).""" + out = self._retrieved_this_turn + self._retrieved_this_turn = set() + return out + + async def _retrieve_relevant_rules(self, all_rules: str, user_message: str) -> str: + """Select the rules that go into the system prompt. + + Layer 3 — retrieval-scored rule ranking: + + - ``## Always`` / ``## Never`` rules are unconditional and + always loaded in full. They're not ranked because ranking + unconditional rules is a category error. + - ``## When`` rules are ranked by BM25 relevance against the + current ``user_message``. The top-K within budget land in + the prompt; the rest are dropped for this turn. + - Every rule that lands in the prompt bumps its retrieval + counter via ``RuleStats``. Phase C (outcome bridge) will + later use these counters to compute an "ignored" signal + when ACC detects the corresponding pattern despite the + rule having been loaded. + + Cold-start behaviour: when the corpus fits in the char budget + OR the user message has no scorable terms, all rules are + loaded and their retrievals recorded. The ranker is a + budget-pressure tool, not a permanent filter. + + Brain analog: dlPFC cue-dependent recall. The PFC scores + relevance against current goals and activates the top + candidates rather than loading everything into working memory. + """ + # No query → unfiltered. Still record retrievals so the + # telemetry is honest about what's in the prompt. + if not user_message: + self._record_retrievals_for_lines(all_rules.splitlines()) return all_rules + + # Under budget → no point ranking; load all + record. if len(all_rules) <= self._RULES_BUDGET_CHARS: + self._record_retrievals_for_lines(all_rules.splitlines()) return all_rules - # Split rules into mandatory (Always/Never) and filterable (When) + # Split into mandatory (Always / Never / non-section lines) vs. + # rankable (When bullets). Section headers stay with mandatory + # so the output keeps its markdown structure. lines = all_rules.splitlines() mandatory_lines: list[str] = [] when_lines: list[str] = [] @@ -250,7 +371,7 @@ async def _retrieve_relevant_rules(self, all_rules: str, user_message: str) -> s mandatory_lines.append(line) elif stripped.startswith("## When"): current_section = "when" - mandatory_lines.append(line) # keep the header + mandatory_lines.append(line) elif stripped.startswith("## ") or stripped.startswith("# "): current_section = "" mandatory_lines.append(line) @@ -259,37 +380,54 @@ async def _retrieve_relevant_rules(self, all_rules: str, user_message: str) -> s else: mandatory_lines.append(line) - # If When section is small, no need to filter + # Tiny When section → no ranking work to do. when_text = "\n".join(when_lines).strip() if not when_text or len(when_text) < 1000: + self._record_retrievals_for_lines(lines) return all_rules - # Filter only the When rules - try: - response = await self._llm.code( - system=self._RULES_RETRIEVAL_PROMPT, - messages=[ - { - "role": "user", - "content": f"User message: {user_message}\n\nRules:\n{when_text}", - } - ], - max_tokens=4096, - ) - result = response.content.strip() - if result.upper() == "NONE": - filtered_when = "" - elif result: - filtered_when = result - else: - filtered_when = when_text - except Exception: - filtered_when = when_text + # Build (body, original_line) pairs so we can rank on bodies + # but emit the original markdown lines (preserving metadata + # comments the consumer / consolidator might still read). + candidates: list[tuple[str, str]] = [] + for line in when_lines: + body = self._extract_rule_body(line) + if body: + candidates.append((body, line)) + + if not candidates: + self._record_retrievals_for_lines(lines) + return all_rules + + bodies = [b for b, _ in candidates] + body_to_line = {b: l for b, l in candidates} + + ranked = self._ranker.rank(bodies, user_message) + + # Remaining char budget = total budget minus what mandatory + # lines already consume. Convert to a rough token budget + # (~4 chars/token English heuristic) for the ranker's selector. + mandatory_chars = sum(len(l) + 1 for l in mandatory_lines) + remaining_chars = max(0, self._RULES_BUDGET_CHARS - mandatory_chars) + remaining_tokens = max(100, remaining_chars // 4) + selected = self._ranker.select_within_budget( + ranked, budget_tokens=remaining_tokens + ) + + selected_lines: list[str] = [] + for r in selected: + line = body_to_line.get(r.text) + if line is not None: + selected_lines.append(line) + + # Record retrievals for everything that lands in the prompt — + # mandatory rules AND the selected When rules. (Section + # headers and blanks are filtered inside the helper.) + self._record_retrievals_for_lines(mandatory_lines + selected_lines) - # Reassemble: mandatory sections + filtered When rules output = "\n".join(mandatory_lines) - if filtered_when: - output += "\n" + filtered_when + if selected_lines: + output += "\n" + "\n".join(selected_lines) return output def get_scratchpad_context(self) -> str: diff --git a/anton/core/memory/ranker.py b/anton/core/memory/ranker.py new file mode 100644 index 00000000..ca255f37 --- /dev/null +++ b/anton/core/memory/ranker.py @@ -0,0 +1,201 @@ +"""BM25-style lexical ranker over rule text — Layer 3 of the ACC path. + +Scores each rule against the current turn's query (the user message) +and returns them sorted by relevance. The result drives which rules +land in the system prompt under the ``## When`` section when the +total rule corpus exceeds the token budget. + +Why BM25 and not embeddings (v1): + + - No new dependencies. anton has no embeddings client today, and + adding one means another API surface (or a local model + tokenizer) + that has to live alongside the existing planning/coding split. + - No LLM call on the prompt-assembly hot path. Build-memory-context + runs at the start of every turn; a synchronous, deterministic + ranker keeps the cold-start latency low. + - For 1–3 sentence rules + short user messages, lexical matching + works because rules contain domain nouns ('pandas', 'CSV', + 'publish', 'scratchpad', 'datavault') and user messages mention + those same nouns. The cases where pure lexical matching fails + (paraphrase, abstract relevance) also happen to be the cases + where the corpus is small enough to fit in the budget without + ranking — so the failure mode is graceful. + - Microsecond-cheap: O(rules × query_terms). For ~50 rules × ~20 + query terms the entire rank() call is sub-millisecond. + +Designed to be **drop-in replaceable** by an embedding-based ranker +in v2. Keep this interface narrow (``rank`` + ``select_within_budget``) +so swapping implementations doesn't touch the cortex call site. +""" + +from __future__ import annotations + +import math +import re +from collections import Counter +from dataclasses import dataclass + + +# Minimal stopword list. Stemming deliberately omitted — adds dep + bugs +# for marginal lift on these short documents. The list is intentionally +# small; over-stopping (e.g. removing "when", "if") would hurt rule +# texts that pivot on those words. +_STOPWORDS = frozenset({ + "a", "an", "the", "and", "or", "but", "of", "in", "on", "at", + "to", "from", "for", "with", "by", "is", "are", "was", "were", + "be", "been", "being", "do", "does", "did", "doing", "have", + "has", "had", "having", "will", "would", "could", "should", + "may", "might", "can", "this", "that", "these", "those", + "you", "your", "yours", "it", "its", "as", "so", "not", +}) + + +_WORD_RE = re.compile(r"[a-z0-9]+") + + +def tokenize(text: str) -> list[str]: + """Lowercase → strip punctuation → drop stopwords. Preserves + digits so rule text like 'over 5 KB' keeps '5' as a queryable + term. Public for testing and for the Phase C outcome bridge, + which compares rule text against detector lesson text.""" + if not text: + return [] + return [t for t in _WORD_RE.findall(text.lower()) if t not in _STOPWORDS] + + +@dataclass(frozen=True) +class RankedRule: + """One rule + its BM25 score for the current query. + + ``token_estimate`` is a rough word count used for budget + arithmetic in ``select_within_budget``. We deliberately don't + import a real tokenizer here — that would couple the ranker to + the LLM provider. The 1.3× word-count multiplier matches typical + English BPE expansion well enough for budget enforcement that + only needs to be approximately right. + """ + + text: str + score: float + token_estimate: int + + +def _estimate_tokens(text: str) -> int: + return max(1, int(len(text.split()) * 1.3)) + + +class Ranker: + """BM25 ranker with budget-aware selection. + + Tunable parameters use the standard Robertson defaults + (``k1=1.5``, ``b=0.75``). We haven't tuned them against real rule + corpora — the defaults are fine for v1 because the corpus is + tiny (typically <50 rules). When the corpus grows and tuning + matters, switch to embeddings rather than tuning BM25 — the + upgrade path is more productive than the tuning one. + """ + + def __init__(self, k1: float = 1.5, b: float = 0.75): + self._k1 = k1 + self._b = b + + def rank(self, rule_texts: list[str], query: str) -> list[RankedRule]: + """Rank rules by BM25 score against ``query``. + + Empty query → return rules in their input order with + ``score=0.0``. This is the cold-start case (no user message + yet, or the message contains no scorable terms after stopword + removal). Falling back to insertion order rather than dropping + all rules keeps the system prompt intact for turn 1. + + Stable for ties: rules with equal scores keep their input + order (Python's sort is stable). When the consolidator + eventually wants "most recently added rule wins ties", it can + pre-sort the input list to match. + """ + if not rule_texts: + return [] + + query_terms = tokenize(query) + if not query_terms: + return [ + RankedRule(text=t, score=0.0, token_estimate=_estimate_tokens(t)) + for t in rule_texts + ] + + docs = [tokenize(t) for t in rule_texts] + doc_lens = [len(d) for d in docs] + avgdl = (sum(doc_lens) / len(doc_lens)) if doc_lens else 1.0 + avgdl = max(avgdl, 1.0) + + # Document frequency per term across the rule corpus. + df: dict[str, int] = {} + for d in docs: + for term in set(d): + df[term] = df.get(term, 0) + 1 + + N = len(docs) + + def idf(term: str) -> float: + n = df.get(term, 0) + # BM25's "plus-one" IDF — strictly non-negative, well-defined + # when a term is in every doc (n == N) or none (n == 0). + return math.log((N - n + 0.5) / (n + 0.5) + 1.0) + + scored: list[RankedRule] = [] + for i, doc in enumerate(docs): + tfs = Counter(doc) + doc_len = doc_lens[i] or 1 + score = 0.0 + for term in query_terms: + tf = tfs.get(term, 0) + if tf == 0: + continue + numer = tf * (self._k1 + 1) + denom = tf + self._k1 * (1 - self._b + self._b * doc_len / avgdl) + score += idf(term) * (numer / denom) + scored.append(RankedRule( + text=rule_texts[i], + score=score, + token_estimate=_estimate_tokens(rule_texts[i]), + )) + + # Stable sort: ties preserve input order. reverse=True → desc. + scored.sort(key=lambda r: r.score, reverse=True) + return scored + + def select_within_budget( + self, + ranked: list[RankedRule], + budget_tokens: int, + *, + floor_k: int = 3, + cap_k: int = 20, + ) -> list[RankedRule]: + """Take rules in ranked order until we hit the budget. + + - ``floor_k``: always load the top N rules even if they + exceed the budget. The whole point of the ranker is that + the most-relevant rules are valuable; under a tiny budget, + a hard cutoff would defeat the purpose. Skip floor + enforcement implicitly by passing fewer than ``floor_k`` + rules. + - ``cap_k``: never load more than this many even if the + budget allows. A handful of well-chosen rules is more + useful to the LLM than a wall of them, and BM25 scores + drop off sharply past the top results. + """ + out: list[RankedRule] = [] + used = 0 + for i, r in enumerate(ranked): + if i >= cap_k: + break + if i < floor_k: + out.append(r) + used += r.token_estimate + continue + if used + r.token_estimate > budget_tokens: + break + out.append(r) + used += r.token_estimate + return out diff --git a/anton/core/memory/rule_stats.py b/anton/core/memory/rule_stats.py new file mode 100644 index 00000000..7a4d80e3 --- /dev/null +++ b/anton/core/memory/rule_stats.py @@ -0,0 +1,159 @@ +"""Per-rule retrieval and outcome counters — Layer 3 of the ACC path. + +Tracks two things per rule: + + - **retrievals**: how often a rule has been selected into a turn's + system prompt by the ranker (Phase B). + - **ignored**: how often a rule was loaded into the prompt AND the + pattern it warns against fired anyway (Phase C — outcome bridge). + +Stored as a sidecar JSON file (``rules.stats.json`` next to +``rules.md``) rather than inline in the markdown metadata. Two reasons +for the sidecar: + + 1. Writing per-rule counters to ``rules.md`` would require parsing, + mutating, and re-emitting the whole markdown file on every turn. + A JSON sidecar is one ``json.dump`` away from atomic. + 2. The stats are operationally cheap and high-churn; the rule text + is canon. Keeping them separate means a typo in stats can't + corrupt the canonical store. + +Rule identity is ``sha256(rule.text.strip().lower())[:16]``. Stable +for the rule's lifetime; if the consolidator rewrites a rule, the +hash changes and its counters reset. Acceptable v1 trade-off — v2 +should attach a stable UUID in the rule's HTML-comment metadata so +edits preserve identity. Without that, large-scale rephrasing by the +consolidator would zero out the retrieval data we want to use to +*decide* which rules to keep. + +Concurrency: ``flush()`` writes via ``.tmp`` + ``os.replace`` under +``fcntl.flock(LOCK_EX)``. Same shape every other anton storage layer +already uses; the inherited POSIX-only constraint is intentional. + +Usage pattern: callers should mutate via ``record_retrieval`` / +``record_ignored`` and call ``flush()`` once at end of the operation +(typically once per ``build_memory_context()``). Don't flush after +every record — that's one disk write per rule retrieved per turn, +which is silly when the natural batch boundary is the turn itself. +""" + +from __future__ import annotations + +import datetime as dt +import fcntl +import hashlib +import json +import os +from pathlib import Path +from typing import Any + + +_STATS_VERSION = 1 + + +def rule_id(text: str) -> str: + """Stable 64-bit hash of the rule text. + + Public so test fixtures, debug surfaces, and the eventual + ``/memory --rankings`` view can compute the same ID without + duplicating the hashing logic. Changes if the text changes — see + module docstring for the v1 trade-off. + """ + norm = (text or "").strip().lower() + return hashlib.sha256(norm.encode("utf-8")).hexdigest()[:16] + + +def _now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).isoformat() + + +def _blank_record() -> dict[str, Any]: + return { + "retrievals": 0, + "ignored": 0, + "last_retrieved": None, + } + + +class RuleStats: + """Lightweight stats sidecar with a buffer-and-flush write pattern. + + Reads on construction (best-effort: a corrupt file becomes a fresh + state and gets overwritten on next flush, rather than crashing the + turn). Mutations buffer in memory; ``flush()`` writes atomically + when the buffer is dirty. Callers should flush once per turn + rather than after every record. + """ + + def __init__(self, path: Path): + self._path = Path(path) + self._data: dict[str, Any] = {"version": _STATS_VERSION, "rules": {}} + self._dirty = False + self._load() + + # ── persistence ────────────────────────────────────────────────── + + def _load(self) -> None: + if not self._path.exists(): + return + try: + raw = json.loads(self._path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + # Corrupted or unreadable — keep fresh in-memory state and + # overwrite cleanly on next flush. Logging is intentionally + # absent here; an unreadable stats file shouldn't be a + # turn-stopping incident. + return + if isinstance(raw, dict) and isinstance(raw.get("rules"), dict): + self._data = raw + self._data.setdefault("version", _STATS_VERSION) + + def flush(self) -> None: + """Persist the in-memory state if dirty. Idempotent.""" + if not self._dirty: + return + self._path.parent.mkdir(parents=True, exist_ok=True) + tmp = self._path.with_suffix(self._path.suffix + ".tmp") + with open(tmp, "w", encoding="utf-8") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + json.dump(self._data, f, indent=2, sort_keys=True) + os.replace(tmp, self._path) + self._dirty = False + + # ── mutators ───────────────────────────────────────────────────── + + def record_retrieval(self, rule_text: str) -> None: + """Bump the retrieval counter for ``rule_text`` and update its + ``last_retrieved`` timestamp. No-op on empty input.""" + if not (rule_text or "").strip(): + return + rid = rule_id(rule_text) + rec = self._data["rules"].setdefault(rid, _blank_record()) + rec["retrievals"] = int(rec.get("retrievals", 0)) + 1 + rec["last_retrieved"] = _now_iso() + self._dirty = True + + def record_ignored(self, rule_text: str) -> None: + """Bump the ignored counter — used by Phase C (outcome bridge) + when an ACC detector fires for a pattern whose corresponding + rule WAS loaded into this turn's prompt. No-op on empty input.""" + if not (rule_text or "").strip(): + return + rid = rule_id(rule_text) + rec = self._data["rules"].setdefault(rid, _blank_record()) + rec["ignored"] = int(rec.get("ignored", 0)) + 1 + self._dirty = True + + # ── readers ────────────────────────────────────────────────────── + + def get(self, rule_text: str) -> dict[str, Any]: + """Stats for one rule. Returns a blank record if absent — + callers don't need to distinguish 'never seen' from 'zero + counters' for ranking-tiebreak purposes.""" + rid = rule_id(rule_text) + return dict(self._data["rules"].get(rid, _blank_record())) + + def all(self) -> dict[str, dict[str, Any]]: + """Snapshot of every recorded rule keyed by id. Used by tests + and the eventual ``/memory --rankings`` debug surface.""" + return {rid: dict(rec) for rid, rec in self._data["rules"].items()} diff --git a/anton/core/session.py b/anton/core/session.py index 3e0f685c..98ca1baa 100644 --- a/anton/core/session.py +++ b/anton/core/session.py @@ -1092,6 +1092,33 @@ def _schedule_acc_flush(self) -> None: return lessons = acc.at_end_of_turn() + + # Phase C — outcome bridge. For every lesson the ACC just + # produced, ask the cortex "was the rule version of this + # lesson actually loaded into this turn's system prompt?". + # If yes, the LLM SAW the rule and violated the pattern + # anyway → bump that rule's `ignored` counter. High `ignored` + # values are the consolidator's signal to rewrite or escalate. + # + # Done BEFORE engram encoding so a brand-new lesson (which + # would just-now create its rule) can't accidentally count + # itself as ignored. Existing rules that fired their pattern + # again ARE counted. + retrieved_ids: set[str] = set() + if hasattr(cortex, "consume_retrieved_this_turn"): + retrieved_ids = cortex.consume_retrieved_this_turn() + rule_stats = getattr(cortex, "_rule_stats", None) + if lessons and rule_stats is not None and retrieved_ids: + try: + from anton.core.memory.rule_stats import rule_id as _rid + for lesson in lessons: + if _rid(lesson.rule) in retrieved_ids: + rule_stats.record_ignored(lesson.rule) + rule_stats.flush() + except OSError: + # Stats are telemetry — never break the turn. + pass + if not lessons: acc.clear() return diff --git a/anton/memory/manage.py b/anton/memory/manage.py index 9a234043..e319fad5 100644 --- a/anton/memory/manage.py +++ b/anton/memory/manage.py @@ -26,6 +26,7 @@ Command("/memory rules", "show behavioral rules"), Command("/memory rules delete ", "delete rule #n"), Command("/memory rules edit ", "edit rule #n"), + Command("/memory rankings", "show retrieval/ignored counters per rule"), None, Command("/memory lessons", "show learned lessons"), Command("/memory lessons delete ", "delete lesson #n"), @@ -114,6 +115,7 @@ def __init__( self.SUBCOMMANDS: dict[str, object] = { "help": self.help, "rules": self.rules, + "rankings": self.rankings, "lessons": self.lessons, "identity": self.identity, "episodes": self.episodes, @@ -238,6 +240,83 @@ async def rules(self, action: str = None, num: str = None) -> None: self.console.print(" [bold]/memory rules delete [/] — delete rule #n") self.console.print(" [bold]/memory rules edit [/] — edit rule #n") + async def rankings(self, *_args) -> None: + """Show retrieval-scoring telemetry for every stored rule. + + Layer 3 debug surface. Per rule (across both scopes), prints: + - RETR : times this rule was selected into a system prompt + - IGNORED : times the rule was loaded AND the pattern it + warns against fired anyway (Phase C outcome bridge) + - LAST : when the rule was last retrieved (ISO date) + - RULE : the rule text (truncated to keep rows scannable) + + Useful for: spotting cold rules (RETR=0 — candidates for + compaction), spotting noisy rules (high IGNORED — the LLM + keeps seeing them and not following them), and confirming + the ranker is doing reasonable things. + """ + c = self.console + stats = getattr(self.cortex, "_rule_stats", None) + if stats is None: + c.print() + c.print(" [anton.warning]No rule-stats backend on this cortex.[/]") + c.print(" [dim]Stats are recorded only when the hippocampus has a local " + "memory directory. Remote/protocol backends skip this telemetry.[/]") + c.print() + return + + # Gather all rules from both scopes, tagged by their hash so we + # can join against the stats sidecar. + from anton.core.memory.rule_stats import rule_id as _rid + + rows: list[tuple[str, int, int, str, str]] = [] # (rid, retr, ignored, last, text) + seen: set[str] = set() + for scope_hc in (self.cortex.global_hc, self.cortex.project_hc): + for engram in scope_hc.get_rules(): + rid = _rid(engram.text) + if rid in seen: + continue + seen.add(rid) + rec = stats.get(engram.text) + last = (rec.get("last_retrieved") or "—")[:10] # YYYY-MM-DD + rows.append(( + rid, + int(rec.get("retrievals", 0)), + int(rec.get("ignored", 0)), + last, + engram.text, + )) + + if not rows: + c.print() + c.print(" [dim]No rules in memory yet.[/]") + c.print() + return + + # Sort by retrievals desc, then ignored desc (noisy-but-loaded + # rules float up under heavy-use rules) so the most actionable + # entries are at the top. + rows.sort(key=lambda r: (r[1], r[2]), reverse=True) + + c.print() + c.print("[anton.cyan]Rule rankings[/] [dim](retrieval-scoring telemetry)[/]") + c.print() + c.print(f" [dim]{'RETR':>5} {'IGN':>4} {'LAST':<10} RULE[/]") + for _rid_unused, retr, ignored, last, text in rows: + short = text if len(text) <= 90 else text[:87] + "…" + # Highlight noisy rules (IGN > 0) and cold rules (RETR = 0). + tag = "" + if ignored > 0: + tag = "[anton.warning]" + elif retr == 0: + tag = "[dim]" + close = "[/]" if tag else "" + c.print(f" {tag}{retr:>5} {ignored:>4} {last:<10} {short}{close}") + c.print() + c.print(" [dim]RETR = times loaded into the system prompt · " + "IGN = pattern fired despite the rule being loaded[/]") + c.print() + async def lessons(self, action: str = None, num: str = None) -> None: """Display stored lessons, numbered for easy reference.""" global_items = dict(enumerate(self.cortex.global_hc.get_lessons(), start=1)) diff --git a/tests/test_cortex_ranker_integration.py b/tests/test_cortex_ranker_integration.py new file mode 100644 index 00000000..de64c779 --- /dev/null +++ b/tests/test_cortex_ranker_integration.py @@ -0,0 +1,191 @@ +"""Integration: Cortex.build_memory_context wires the ranker + stats. + +These tests don't mock Hippocampus or the stats sidecar — they spin +up real instances on a tmp_path and verify the end-to-end shape: + + 1. A small rules.md is loaded in full (no ranking pressure), and + every bulleted rule gets its retrieval counter bumped. + 2. A large `## When` rules.md is ranked against the user message: + the most-relevant rule appears in the output, less-relevant + rules get dropped, and counters reflect what's loaded. + 3. The stats sidecar is written exactly once per build_memory_context + call (the buffer/flush contract). + 4. Always/Never rules are always loaded even when ranking kicks in. +""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path + +import pytest + +from anton.core.memory.cortex import Cortex +from anton.core.memory.hippocampus import Hippocampus +from anton.core.memory.base import Engram + + +def _make_cortex(tmp_path: Path) -> Cortex: + global_dir = tmp_path / "global" + project_dir = tmp_path / "project" + global_dir.mkdir(parents=True, exist_ok=True) + project_dir.mkdir(parents=True, exist_ok=True) + return Cortex( + global_hc=Hippocampus(global_dir), + project_hc=Hippocampus(project_dir), + mode="autopilot", + ) + + +def _seed_when_rules(hc: Hippocampus, rules: list[str]) -> None: + """Encode a batch of When rules. Pad to >6000 chars so ranking + triggers — under the threshold, Cortex returns everything.""" + for text in rules: + hc.encode_rule(text, kind="when", confidence="high", source="user") + + +class TestSmallCorpusLoadsAll: + @pytest.mark.asyncio + async def test_under_budget_loads_every_rule_and_records_retrievals( + self, tmp_path: Path + ): + cortex = _make_cortex(tmp_path) + _seed_when_rules(cortex.global_hc, [ + "Use httpx instead of requests", + "For CSV files use low_memory=False with pd.read_csv", + ]) + + out = await cortex.build_memory_context("what's the bitcoin price") + assert "httpx" in out + assert "low_memory=False" in out + + # Stats sidecar should have two recorded rules. + stats_path = tmp_path / "global" / "rules.stats.json" + assert stats_path.exists() + data = json.loads(stats_path.read_text()) + # Two distinct rules → two entries. + assert len(data["rules"]) == 2 + for rec in data["rules"].values(): + assert rec["retrievals"] == 1 + assert rec["last_retrieved"] is not None + + +class TestLargeCorpusTriggersRanking: + @pytest.mark.asyncio + async def test_ranker_keeps_relevant_drops_irrelevant(self, tmp_path: Path): + cortex = _make_cortex(tmp_path) + # Each rule is long enough to push the corpus past the 6000-char + # ranking trigger (about 30 rules of ~250 chars each). + pad = " " + ("x" * 200) + pandas_rules = [f"For CSV files use pandas read_csv with utf-8{pad}-{i}" for i in range(15)] + html_rules = [f"For HTML reports render with explicit charset utf-8{pad}-{i}" for i in range(15)] + _seed_when_rules(cortex.global_hc, pandas_rules + html_rules) + + # Query is squarely about CSV / pandas → pandas rules should rank + # higher. With cap_k=20 and budget pressure, we expect MORE pandas + # rules than html rules in the output. + out = await cortex.build_memory_context( + "load the sales.csv and summarize with pandas" + ) + pandas_in_out = sum(1 for r in pandas_rules if r in out) + html_in_out = sum(1 for r in html_rules if r in out) + assert pandas_in_out > html_in_out, ( + f"pandas={pandas_in_out} html={html_in_out} — ranker should " + f"favour CSV/pandas rules for a CSV/pandas query" + ) + + @pytest.mark.asyncio + async def test_always_and_never_sections_survive_ranking(self, tmp_path: Path): + cortex = _make_cortex(tmp_path) + cortex.global_hc.encode_rule( + "Use httpx instead of requests", kind="always", + confidence="high", source="user", + ) + cortex.global_hc.encode_rule( + "Use time.sleep() in scratchpad cells", kind="never", + confidence="high", source="user", + ) + pad = " " + ("x" * 200) + _seed_when_rules(cortex.global_hc, [ + f"When loading CSV files use low_memory=False{pad}-{i}" + for i in range(40) + ]) + + out = await cortex.build_memory_context( + "render an HTML dashboard with utf-8" # unrelated to CSV/httpx/sleep + ) + # Both unconditional rules survive even though the query has no + # lexical overlap with them — they're not ranked, just loaded. + assert "httpx" in out + assert "time.sleep" in out + + +class TestStatsSidecar: + @pytest.mark.asyncio + async def test_two_builds_increment_same_counter(self, tmp_path: Path): + cortex = _make_cortex(tmp_path) + _seed_when_rules(cortex.global_hc, ["Use httpx instead of requests"]) + + await cortex.build_memory_context("call an API") + await cortex.build_memory_context("make an HTTP request") + + data = json.loads((tmp_path / "global" / "rules.stats.json").read_text()) + # One rule, retrieved twice across two builds. + records = list(data["rules"].values()) + assert len(records) == 1 + assert records[0]["retrievals"] == 2 + + @pytest.mark.asyncio + async def test_no_rules_means_no_sidecar(self, tmp_path: Path): + # Cold start: no rules in the hippocampus → build_memory_context + # has nothing to record → the sidecar shouldn't appear. + cortex = _make_cortex(tmp_path) + await cortex.build_memory_context("anything") + # The file may not exist (nothing to write) OR may exist with + # an empty rules map; both are valid. The contract is "no + # spurious counters", not "no file ever". + path = tmp_path / "global" / "rules.stats.json" + if path.exists(): + data = json.loads(path.read_text()) + assert data.get("rules") == {} + + +class TestConsumeRetrievedThisTurn: + """Phase C — outcome bridge takes a per-turn snapshot of which + rule IDs landed in the prompt, so the ACC can ask "did the LLM + actually see this rule?" before bumping the ignored counter.""" + + @pytest.mark.asyncio + async def test_set_accumulates_then_clears_on_consume(self, tmp_path: Path): + cortex = _make_cortex(tmp_path) + _seed_when_rules(cortex.global_hc, [ + "Use httpx instead of requests", + "For CSV files use low_memory=False", + ]) + await cortex.build_memory_context("call an API") + + # Two rules retrieved → set has two IDs. + snapshot = cortex.consume_retrieved_this_turn() + assert len(snapshot) == 2 + + # Second consume in the same turn is empty (take + clear). + again = cortex.consume_retrieved_this_turn() + assert again == set() + + @pytest.mark.asyncio + async def test_set_accumulates_across_multiple_builds_in_one_turn( + self, tmp_path: Path + ): + # `_build_system_prompt` runs build_memory_context more than + # once per turn on certain recovery paths. The retrieval set + # should NOT reset between those builds — it should drain + # only on consume (end-of-turn). + cortex = _make_cortex(tmp_path) + _seed_when_rules(cortex.global_hc, ["Use httpx instead of requests"]) + + await cortex.build_memory_context("call an API") + await cortex.build_memory_context("retry the API call") + snapshot = cortex.consume_retrieved_this_turn() + # Same rule, multiple builds → one ID in the set. + assert len(snapshot) == 1 diff --git a/tests/test_memory_rankings.py b/tests/test_memory_rankings.py new file mode 100644 index 00000000..57f48e21 --- /dev/null +++ b/tests/test_memory_rankings.py @@ -0,0 +1,104 @@ +"""Test the `/memory rankings` debug surface (Layer 3 — Phase D). + +Light-touch tests — we don't reproduce the rich-console formatting +verbatim, we just confirm: + + - The handler runs without errors when stats are absent. + - Recorded retrievals + ignored counts appear in the output. + - Rule text is rendered (so a developer running the command sees + something useful, not just numbers). +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from rich.console import Console + +from anton.core.memory.cortex import Cortex +from anton.core.memory.hippocampus import Hippocampus + + +class _StubSettings: + """Minimal settings stand-in. MemoryManage only ever reads attrs + we set explicitly in its handlers — none for rankings.""" + + pass + + +def _make_manager(tmp_path: Path, console: Console): + from anton.memory.manage import MemoryManage + + global_dir = tmp_path / "global" + project_dir = tmp_path / "project" + global_dir.mkdir(parents=True, exist_ok=True) + project_dir.mkdir(parents=True, exist_ok=True) + + cortex = Cortex( + global_hc=Hippocampus(global_dir), + project_hc=Hippocampus(project_dir), + mode="autopilot", + ) + return MemoryManage( + console=console, + settings=_StubSettings(), + cortex=cortex, + ) + + +@pytest.mark.asyncio +async def test_rankings_with_no_rules_prints_empty_notice(tmp_path: Path): + buf = Console(record=True) + manager = _make_manager(tmp_path, buf) + await manager.rankings() + output = buf.export_text() + assert "No rules in memory" in output + + +@pytest.mark.asyncio +async def test_rankings_shows_recorded_counts(tmp_path: Path): + buf = Console(record=True) + manager = _make_manager(tmp_path, buf) + cortex = manager.cortex + + # Seed two rules and simulate two retrievals + one ignored on the + # first. Use the stats API directly — testing the surface, not the + # capture path (that's covered in the cortex integration tests). + cortex.global_hc.encode_rule( + "Use httpx instead of requests", + kind="when", confidence="high", source="user", + ) + cortex.global_hc.encode_rule( + "For CSV files use low_memory=False", + kind="when", confidence="high", source="user", + ) + cortex._rule_stats.record_retrieval("Use httpx instead of requests") + cortex._rule_stats.record_retrieval("Use httpx instead of requests") + cortex._rule_stats.record_ignored("Use httpx instead of requests") + cortex._rule_stats.flush() + + await manager.rankings() + output = buf.export_text() + # Rule text appears. + assert "httpx" in output + assert "low_memory" in output + # Counters appear (retrievals=2, ignored=1 for the first rule). + # We look for the digits in proximity to the rule rather than + # pinning column positions — that lets us refactor formatting + # without breaking the test. + assert "2" in output # retrieval count + assert "1" in output # ignored count + + +@pytest.mark.asyncio +async def test_rankings_with_remote_backend_explains_absence(tmp_path: Path): + # If the cortex has no rule_stats (e.g. remote hippocampus with no + # local dir), the command should explain rather than crash or + # render an empty table. + buf = Console(record=True) + manager = _make_manager(tmp_path, buf) + manager.cortex._rule_stats = None # simulate remote-backend scenario + await manager.rankings() + output = buf.export_text() + assert "rule-stats backend" in output or "No rule-stats" in output diff --git a/tests/test_ranker.py b/tests/test_ranker.py new file mode 100644 index 00000000..d8c98387 --- /dev/null +++ b/tests/test_ranker.py @@ -0,0 +1,136 @@ +"""Tests for the BM25 ranker (Layer 3 — Phase B). + +The ranker decides which `## When` rules land in the system prompt +when the rule corpus exceeds the token budget. These tests pin the +core BM25 contract (relevance signals lift the right rules), the +edge cases (empty inputs), and the budget arithmetic (floor + cap). +""" + +from __future__ import annotations + +from anton.core.memory.ranker import Ranker, RankedRule, tokenize + + +class TestTokenize: + def test_lowercases_and_drops_punct(self): + assert tokenize("Use ONE scratchpad name per task!") == [ + "use", "one", "scratchpad", "name", "per", "task" + ] + + def test_drops_stopwords(self): + # 'the' / 'and' / 'of' / 'to' are stopwords. + assert "the" not in tokenize("the scratchpad") + assert "and" not in tokenize("save and exit") + assert "of" not in tokenize("a lot of csv") + assert "to" not in tokenize("write to disk") + + def test_keeps_numbers(self): + # "5 KB" should keep "5" because rules like "cells over 5 KB" + # leverage numeric matching against user messages. + toks = tokenize("Keep cells under 5 KB") + assert "5" in toks + assert "kb" in toks + + def test_empty_input_returns_empty(self): + assert tokenize("") == [] + assert tokenize(None) == [] # type: ignore[arg-type] + + +class TestRankRelevance: + """The headline behaviour: relevant rules outrank irrelevant ones.""" + + def test_pandas_query_lifts_pandas_rule(self): + rules = [ + "Use httpx instead of requests for HTTP calls", + "For CSV files with mixed types, pass low_memory=False to pd.read_csv", + "Render HTML reports with explicit charset utf-8", + ] + ranked = Ranker().rank(rules, "process the sales.csv with pandas") + # The pandas/csv rule should be at the top. + assert "pd.read_csv" in ranked[0].text + + def test_html_query_lifts_html_rule(self): + rules = [ + "Use httpx instead of requests for HTTP calls", + "For CSV files with mixed types, pass low_memory=False to pd.read_csv", + "Render HTML reports with explicit charset utf-8", + ] + ranked = Ranker().rank(rules, "build a dashboard HTML report") + assert "HTML" in ranked[0].text + + def test_completely_irrelevant_query_still_returns_all_rules(self): + rules = [ + "Use httpx instead of requests", + "For CSV files use low_memory=False", + ] + ranked = Ranker().rank(rules, "blueberries are tasty") + # No matches → all scores zero, but we still return every rule + # so the budget step has something to pick from. + assert len(ranked) == 2 + assert all(r.score == 0.0 for r in ranked) + + +class TestRankEdgeCases: + def test_empty_corpus(self): + assert Ranker().rank([], "anything") == [] + + def test_empty_query_returns_input_order(self): + rules = ["alpha rule", "beta rule", "gamma rule"] + ranked = Ranker().rank(rules, "") + assert [r.text for r in ranked] == rules + assert all(r.score == 0.0 for r in ranked) + + def test_query_with_only_stopwords_returns_input_order(self): + # "the and of to" tokenizes to nothing → cold-start fallback. + rules = ["alpha rule", "beta rule"] + ranked = Ranker().rank(rules, "the and of to") + assert [r.text for r in ranked] == rules + + def test_token_estimate_is_positive(self): + ranked = Ranker().rank(["short rule"], "rule") + assert ranked[0].token_estimate >= 1 + + def test_stable_tie_preserves_input_order(self): + # Identical-on-the-query rules should keep input order. + rules = ["pandas a", "pandas b", "pandas c"] + ranked = Ranker().rank(rules, "pandas") + # All three contain only "pandas" → identical scores → input order. + assert [r.text for r in ranked] == rules + + +class TestSelectWithinBudget: + def _make(self, n: int, tokens_each: int = 10) -> list[RankedRule]: + # Descending scores so the order in the input is the rank order. + return [ + RankedRule(text=f"rule-{i}", score=float(n - i), token_estimate=tokens_each) + for i in range(n) + ] + + def test_budget_caps_selection(self): + ranked = self._make(10, tokens_each=20) + out = Ranker().select_within_budget(ranked, budget_tokens=85) + # Budget = 85 tokens, each rule 20 → 4 rules fit (80 used; 5th + # would push to 100). With floor_k=3 the first 3 are auto-loaded, + # then the 4th fits within budget. + assert len(out) == 4 + + def test_floor_k_loaded_even_when_budget_too_small(self): + ranked = self._make(10, tokens_each=50) + out = Ranker().select_within_budget(ranked, budget_tokens=10, floor_k=3) + # Budget too small for even one rule, but floor_k=3 forces top-3. + assert len(out) == 3 + + def test_cap_k_limits_even_with_huge_budget(self): + ranked = self._make(50, tokens_each=1) + out = Ranker().select_within_budget(ranked, budget_tokens=10_000, cap_k=20) + # 50 rules × 1 token each easily fits, but cap_k caps at 20. + assert len(out) == 20 + + def test_floor_k_capped_by_input_size(self): + ranked = self._make(2, tokens_each=10) + out = Ranker().select_within_budget(ranked, budget_tokens=5, floor_k=5) + # Only 2 rules to choose from → floor_k can't add more. + assert len(out) == 2 + + def test_empty_ranked_returns_empty(self): + assert Ranker().select_within_budget([], budget_tokens=100) == [] diff --git a/tests/test_rule_stats.py b/tests/test_rule_stats.py new file mode 100644 index 00000000..ac9e0f98 --- /dev/null +++ b/tests/test_rule_stats.py @@ -0,0 +1,155 @@ +"""Tests for the rule-stats sidecar (Layer 3 — Phase A). + +The sidecar is the storage half of retrieval-scoring: it tracks how +often a rule lands in the system prompt (retrievals) and how often it +was loaded while the pattern it warns against still fired (ignored). + +These tests pin: + - Hash stability across whitespace / case + - Counter increment + persistence + - Atomic write semantics (no half-written files) + - Buffer/flush behavior (no I/O on mutation) + - Corruption recovery (unreadable file → fresh state, not crash) + - Forward-compat: missing `version` field auto-populated +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from anton.core.memory.rule_stats import RuleStats, rule_id + + +class TestRuleId: + def test_stable_across_calls(self): + a = rule_id("Use ONE scratchpad name per task") + b = rule_id("Use ONE scratchpad name per task") + assert a == b + + def test_case_and_whitespace_insensitive(self): + a = rule_id("Use ONE scratchpad name per task") + b = rule_id(" use one SCRATCHPAD name per task ") + assert a == b + + def test_changes_on_content_change(self): + a = rule_id("Use ONE scratchpad name per task") + b = rule_id("Use ONE scratchpad name per task and reuse it") + assert a != b + + def test_empty_input_does_not_crash(self): + # Defensive — record_retrieval no-ops on empty input, but the + # hash function itself should still be callable. + assert rule_id("") == rule_id("") + assert rule_id(None) == rule_id("") # type: ignore[arg-type] + + def test_id_is_64bit_hex(self): + rid = rule_id("anything at all") + assert len(rid) == 16 + # All chars hex. + int(rid, 16) + + +class TestRecordAndPersist: + def test_record_buffers_in_memory(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + # No file written yet — record_* is supposed to be a buffer hit, + # not a disk hit. This is the contract that lets a turn flush + # once instead of once per rule. + assert not (tmp_path / "rules.stats.json").exists() + # But the in-memory state reflects the bump. + assert stats.get("rule A")["retrievals"] == 1 + + def test_flush_persists_state(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + stats.flush() + # Read it back through a fresh instance. + again = RuleStats(tmp_path / "rules.stats.json") + assert again.get("rule A")["retrievals"] == 1 + + def test_flush_is_idempotent_when_clean(self, tmp_path: Path): + # Two flushes back-to-back shouldn't double-write or fail. + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + stats.flush() + mtime_after_first = (tmp_path / "rules.stats.json").stat().st_mtime + stats.flush() # nothing dirty → no-op + mtime_after_second = (tmp_path / "rules.stats.json").stat().st_mtime + assert mtime_after_first == mtime_after_second + + def test_multiple_records_one_flush(self, tmp_path: Path): + # The whole point of the buffer pattern. + stats = RuleStats(tmp_path / "rules.stats.json") + for _ in range(5): + stats.record_retrieval("rule A") + stats.record_retrieval("rule B") + stats.flush() + again = RuleStats(tmp_path / "rules.stats.json") + assert again.get("rule A")["retrievals"] == 5 + assert again.get("rule B")["retrievals"] == 1 + + def test_record_ignored_separate_counter(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + stats.record_retrieval("rule A") + stats.record_ignored("rule A") + stats.flush() + rec = RuleStats(tmp_path / "rules.stats.json").get("rule A") + assert rec["retrievals"] == 2 + assert rec["ignored"] == 1 + + def test_last_retrieved_updates(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + first = stats.get("rule A")["last_retrieved"] + assert first is not None + # Bump again; timestamp should not be None. + stats.record_retrieval("rule A") + second = stats.get("rule A")["last_retrieved"] + # Both are ISO timestamps; equality across rapid bumps is OK, + # what we care about is that the field gets set on every bump. + assert second is not None + + def test_record_blank_input_is_noop(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("") + stats.record_retrieval(" ") + stats.record_ignored("") + stats.flush() + assert stats.all() == {} + + +class TestCorruptionRecovery: + def test_unreadable_file_falls_back_to_fresh_state(self, tmp_path: Path): + # Write garbage at the stats path; constructor should not crash. + path = tmp_path / "rules.stats.json" + path.write_text("{this is not valid json", encoding="utf-8") + stats = RuleStats(path) + # Fresh state — no rules, default version. + assert stats.all() == {} + # And the next write should cleanly overwrite the garbage. + stats.record_retrieval("rule A") + stats.flush() + again = RuleStats(path) + assert again.get("rule A")["retrievals"] == 1 + + def test_wrong_shape_falls_back_to_fresh_state(self, tmp_path: Path): + path = tmp_path / "rules.stats.json" + path.write_text('{"rules": "not a dict"}', encoding="utf-8") + stats = RuleStats(path) + assert stats.all() == {} + + +class TestAll: + def test_all_returns_snapshot(self, tmp_path: Path): + stats = RuleStats(tmp_path / "rules.stats.json") + stats.record_retrieval("rule A") + stats.record_retrieval("rule B") + snapshot = stats.all() + assert len(snapshot) == 2 + # Modifying the snapshot must not affect internal state. + next(iter(snapshot.values()))["retrievals"] = 999 + assert stats.get("rule A")["retrievals"] == 1 diff --git a/tests/test_session_acc_init.py b/tests/test_session_acc_init.py index 1a0f6732..8d6e3ce0 100644 --- a/tests/test_session_acc_init.py +++ b/tests/test_session_acc_init.py @@ -30,22 +30,53 @@ from anton.core.session import ChatSession +class FakeRuleStats: + """Captures record_ignored calls so the outcome-bridge test can + assert on them without instantiating real file I/O.""" + + def __init__(self): + self.ignored: list[str] = [] + self.flushed = 0 + + def record_ignored(self, rule_text: str) -> None: + self.ignored.append(rule_text) + + def flush(self) -> None: + self.flushed += 1 + + class FakeCortex: """Minimal stand-in for Cortex used by _schedule_acc_flush. Records every batch of engrams passed to `encode()` so the test can - inspect exactly what would land in long-term memory. + inspect exactly what would land in long-term memory. Also stubs + `consume_retrieved_this_turn` + `_rule_stats` for Phase C tests. """ - def __init__(self, mode: str = "autopilot"): + def __init__( + self, + mode: str = "autopilot", + *, + retrieved_ids: set[str] | None = None, + rule_stats: FakeRuleStats | None = None, + ): self.mode = mode self.encoded: list[list[Engram]] = [] self.global_hc = SimpleNamespace(recall_rules=lambda: "") + self._retrieved_ids = retrieved_ids if retrieved_ids is not None else set() + self._rule_stats = rule_stats + self.consume_calls = 0 async def encode(self, engrams: list[Engram]) -> list[str]: self.encoded.append(list(engrams)) return [f"encoded:{e.kind}" for e in engrams] + def consume_retrieved_this_turn(self) -> set[str]: + self.consume_calls += 1 + out = self._retrieved_ids + self._retrieved_ids = set() + return out + def _make_session_stub(cortex: FakeCortex, *, acc_mode: str = "passive") -> SimpleNamespace: """Build the smallest object that satisfies _acc_observe, @@ -260,6 +291,88 @@ def test_off_mode_skips_nudge_entirely(self): assert n == 0 assert s._acc.events == () +class TestOutcomeBridge: + """Phase C — when an ACC lesson fires AND its corresponding rule + was loaded into this turn's system prompt, the cortex's rule + stats sidecar gets an `ignored` bump. High ignored counts are + the consolidator's signal to rewrite / escalate / replace. + + These tests pin the wiring contract; the cortex-side correlation + is covered in test_cortex_ranker_integration.py.""" + + @pytest.mark.asyncio + async def test_ignored_bumped_for_lessons_whose_rule_was_retrieved(self): + from anton.core.memory.rule_stats import rule_id as _rid + + stats = FakeRuleStats() + # Pre-seed retrieved_ids with the rule-id matching the name-switch + # lesson (which the events below will fire). + # We can't easily know the lesson text in advance; instead we + # populate retrieved_ids based on what the detector will produce. + from anton.core.memory.acc import detect_name_switch, Event + synthetic_events = [ + Event("scratchpad_call", 1, {"name": "a"}, 1), + Event("scratchpad_call", 1, {"name": "b"}, 2), + ] + expected_lesson = detect_name_switch(synthetic_events) + assert expected_lesson is not None + retrieved_ids = {_rid(expected_lesson.rule)} + + cortex = FakeCortex(retrieved_ids=retrieved_ids, rule_stats=stats) + s = _make_session_stub(cortex) + # Replay the same events into the real ACC. + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + + s._schedule_acc_flush() + await asyncio.sleep(0) + + # The retrieved rule got an `ignored` bump because the same + # pattern fired despite the rule being loaded. + assert len(stats.ignored) == 1 + assert "ONE scratchpad" in stats.ignored[0] + assert stats.flushed >= 1 + + @pytest.mark.asyncio + async def test_no_ignored_bump_when_rule_was_not_retrieved(self): + # ACC fires a lesson but no rule version of it was loaded this + # turn → no ignored bump. Bumping here would over-penalise new + # rules (the LLM never saw them, so it couldn't have ignored + # them). + stats = FakeRuleStats() + cortex = FakeCortex(retrieved_ids=set(), rule_stats=stats) + s = _make_session_stub(cortex) + s._acc_observe("scratchpad_call", {"name": "a", "code_len": 100}, round_idx=1) + s._acc_observe("scratchpad_call", {"name": "b", "code_len": 100}, round_idx=2) + + s._schedule_acc_flush() + await asyncio.sleep(0) + + assert stats.ignored == [] + + @pytest.mark.asyncio + async def test_no_lessons_means_no_consume_or_bump(self): + # When no detectors fire, the outcome bridge has nothing to + # do. Cortex's retrieval set must not be drained (we'd lose + # the data if the next event-emitting code path expected it). + stats = FakeRuleStats() + cortex = FakeCortex(retrieved_ids={"abc"}, rule_stats=stats) + s = _make_session_stub(cortex) + # One scratchpad_call → no detector fires (name_switch needs 2). + s._acc_observe("scratchpad_call", {"name": "solo", "code_len": 50}) + + s._schedule_acc_flush() + await asyncio.sleep(0) + + # consume_retrieved_this_turn IS called (the bridge wants to + # know which rules were available), but no bumps happen + # because no lessons fired. + assert stats.ignored == [] + + +class TestDetectorExceptionDoesNotCrash: + """Restored from earlier inline placeholder.""" + def test_detector_exception_does_not_crash_turn(self): # Wire a broken detector and verify the nudge wrapper swallows # the exception. Layer 1's end-of-turn flush still runs