Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 166 additions & 20 deletions anton/README.md

Large diffs are not rendered by default.

744 changes: 744 additions & 0 deletions anton/core/memory/acc.py

Large diffs are not rendered by default.

239 changes: 239 additions & 0 deletions anton/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from collections.abc import AsyncIterator, Callable
from dataclasses import asdict, dataclass, field
import json
import os
from typing import TYPE_CHECKING

from anton.core.backends.base import Cell, ScratchpadRuntimeFactory
from anton.core.backends.local import local_scratchpad_runtime_factory
from anton.core.datasources.data_vault import DataVault
from anton.core.llm.prompt_builder import ChatSystemPromptBuilder, SystemPromptContext
from anton.core.memory.acc import AnteriorCingulate
from anton.core.memory.base import Engram
from anton.core.memory.cerebellum import Cerebellum
from anton.core.memory.skills import SkillStore
from anton.core.tools.recall_skill import RECALL_SKILL_TOOL
Expand Down Expand Up @@ -146,10 +149,55 @@ def __init__(self, config: ChatSessionConfig) -> None:
cortex=self._cortex,
llm=self._llm,
)
# Anterior Cingulate Cortex: turn-level pattern detection.
# Where the cerebellum looks at one cell and asks "did this
# cell do what it claimed", the ACC looks at the whole turn
# and asks "is the same failure pattern firing more than
# once". Emit points are scattered (scratchpad dispatcher,
# tool dispatch, history-repair, round-cap) rather than
# routed through the scratchpad observer list, because most
# of what the ACC watches isn't scratchpad-scoped. The
# session holds the reference; emit sites call
# `session._acc.observe(kind, detail, ...)` directly.
#
# has_similar_lesson: cheap substring check against the
# current rules.md content. Avoids re-encoding the same
# rule every turn. Embedding similarity is a v2 upgrade.
def _acc_has_similar(rule: str) -> bool:
cortex = getattr(self, "_cortex", None)
hc = getattr(cortex, "global_hc", None) if cortex else None
if hc is None:
return False
try:
existing = hc.recall_rules() or ""
except Exception:
return False
probe = (rule or "")[:60].lower()
return bool(probe) and probe in existing.lower()

self._acc = AnteriorCingulate(has_similar_lesson=_acc_has_similar)
# ANTON_ACC_MODE controls how aggressively ACC affects the
# turn. Mirrors ANTON_MEMORY_MODE for shape consistency:
# "off" — ACC observes nothing (skipped at every emit site).
# "passive" — Layer 1: lessons drain to memory at end-of-turn,
# next turn's system prompt picks them up. SAFE
# DEFAULT — adds no surface-area to the turn loop.
# "active" — Layer 2: ALSO inject lessons inline as text
# blocks in tool_results so the LLM sees them on
# the very next round. Stronger learning signal,
# but more invasive — the LLM has to handle the
# nudge gracefully without confusing it for a
# user instruction.
_mode_raw = os.environ.get("ANTON_ACC_MODE", "passive").strip().lower()
self._acc_mode = _mode_raw if _mode_raw in ("off", "passive", "active") else "passive"
# Scratchpad observers — list of objects with on_pre_execute /
# on_post_execute. Fired by handle_scratchpad around pad.execute.
# The runtime never sees this list; observation lives at the
# dispatcher layer to keep local/remote runtimes interchangeable.
# ACC is intentionally NOT in this list — its emit footprint
# is broader than scratchpad cells (it also needs to see tool
# calls, history repairs, the round cap), so it's wired via
# direct `session._acc.observe(...)` at each emit site.
self._scratchpad_observers: list = [self._cerebellum]
self._explainability_store = (
ExplainabilityStore(config.workspace.base) if config.workspace is not None else None
Expand Down Expand Up @@ -770,6 +818,15 @@ def _seal_dangling_tool_uses(self, reason: str = "interrupted") -> int:
last_assistant_idx + 1,
{"role": "user", "content": synth_blocks},
)
# ACC: emit history_repair so detect_repair_churn can fire
# when the LLM is generating malformed tool_use/result pairs
# repeatedly. One repair is a hiccup; three in a turn is the
# conversation derailing.
self._acc_observe(
"history_repair",
{"reason": reason, "sealed_count": len(missing)},
severity=5,
)
return len(missing)

def hard_truncate_history(self, keep: int = 4) -> None:
Expand Down Expand Up @@ -933,6 +990,141 @@ def factory_validated():
async for event in self._llm.plan_stream(messages=factory_validated(), **kwargs):
yield event

def _acc_observe(
self,
kind: str,
detail: dict | None = None,
*,
severity: int = 1,
round_idx: int = 0,
) -> None:
"""Safe-emit wrapper for ACC events.

Returns silently when:
- the ACC isn't attached (defensive — should always be set),
- the cortex is disabled (`mode == "off"`), so observation
without persistence is pointless,
- `observe()` raises (e.g. unknown kind from a stale call site).

Emit sites call this rather than touching `self._acc` directly
so that adding/renaming kinds, or turning the ACC off via a
future env var, lives in one place.
"""
acc = getattr(self, "_acc", None)
if acc is None:
return
if getattr(self, "_acc_mode", "passive") == "off":
return
cortex = getattr(self, "_cortex", None)
if cortex is not None and getattr(cortex, "mode", "") == "off":
return
try:
acc.observe(kind, detail or {}, severity=severity, round_idx=round_idx)
except ValueError:
# Unknown event kind from a stale emit site — log via the
# cerebellum's logger contract once we have one; for now,
# swallow so observation drift never breaks a turn.
pass

def _acc_maybe_nudge(self, tool_results: list[dict]) -> int:
"""Layer 2 — mid-turn nudging.

If `ANTON_ACC_MODE == "active"`, run the ACC's per-round
detection pass and append any newly-fired lessons as text
blocks INSIDE the `tool_results` content list. They piggy-back
on the user-role message that's about to be appended to
history, so the LLM sees them on its very next round.

Why text blocks alongside tool_result blocks (vs. a separate
user message)? Anthropic's API allows a user message to mix
types in its content array. Reusing the same message keeps the
nudge tightly bound to the round that produced it and avoids
introducing a new consecutive-user-message edge case that the
history validator would have to learn about.

Returns the number of nudges appended (mostly for tests /
observability). Zero in passive mode, zero when no detectors
newly fired.
"""
if getattr(self, "_acc_mode", "passive") != "active":
return 0
acc = getattr(self, "_acc", None)
if acc is None:
return 0
try:
lessons = acc.at_round_n()
except Exception:
# Defensive: a buggy detector should never crash the turn.
# Layer 1 still drains at end-of-turn so we lose nothing.
return 0
if not lessons:
return 0
for lesson in lessons:
tool_results.append({
"type": "text",
"text": (
f"[Anton self-check — {lesson.detector}] {lesson.rule} "
"(This is an automatic mid-turn observation from your own "
"monitoring layer, not a user message.)"
),
})
return len(lessons)

def _schedule_acc_flush(self) -> None:
"""Drain the ACC's turn buffer into Engrams and clear it.

Parallel to `_schedule_cerebellum_flush()`: same fire-and-
forget contract, same end-of-turn slot. The ACC's detectors
are pure functions (no LLM call), so running them is cheap;
the only async work is `cortex.encode()`, which writes the
lessons to disk. We still wrap it in `asyncio.create_task`
so the user-facing reply isn't blocked on file I/O.

Best-effort: if there's no event loop (sync test, edge case),
we drop the buffer rather than raise.
"""
acc = getattr(self, "_acc", None)
if acc is None:
return
cortex = getattr(self, "_cortex", None)
if cortex is None or getattr(cortex, "mode", "") == "off":
acc.clear()
return

lessons = acc.at_end_of_turn()
if not lessons:
acc.clear()
return

engrams = [
Engram(
text=l.rule,
kind=l.kind, # always / never / when from the detector
scope="global", # ACC lessons are cross-project
confidence="high", # detectors only fire on confirmed patterns
source="consolidation",
)
for l in lessons
]

# Check for a running event loop first so we don't construct a
# coroutine object only to drop it (which triggers an unawaited-
# coroutine warning). ACC learning is best-effort, same as
# cerebellum learning — if there's no loop we drop the buffer.
try:
asyncio.get_running_loop()
except RuntimeError:
acc.clear()
return

async def _drain() -> None:
try:
await cortex.encode(engrams)
finally:
acc.clear()

asyncio.create_task(_drain())

def _schedule_cerebellum_flush(self) -> None:
"""Fire the cerebellum's batched diff pass without blocking the turn.

Expand Down Expand Up @@ -1073,6 +1265,7 @@ async def turn(self, user_input: str | list[dict]) -> str:
# in the background. Brain analogue: cerebellar plasticity
# operates in parallel with continued action, not blocking it.
self._schedule_cerebellum_flush()
self._schedule_acc_flush()

return reply

Expand Down Expand Up @@ -1196,6 +1389,7 @@ async def turn_stream(
# the non-streaming turn. Lets the user-facing stream finish
# immediately while supervised error learning runs in the background.
self._schedule_cerebellum_flush()
self._schedule_acc_flush()

async def _stream_and_handle_tools(
self, user_message: str = ""
Expand Down Expand Up @@ -1274,6 +1468,12 @@ async def _stream_and_handle_tools(
tool_round += 1
if tool_round > self._max_tool_rounds:
_max_rounds_hit = True
self._acc_observe(
"cap_exhausted",
{"cap": self._max_tool_rounds},
severity=9,
round_idx=tool_round,
)
self._append_history(
{"role": "assistant", "content": llm_response.content or ""}
)
Expand Down Expand Up @@ -1317,6 +1517,17 @@ async def _stream_and_handle_tools(

tool_results: list[dict] = []
for tc in llm_response.tool_calls:
# ACC: tool_call emit. Args_summary is intentionally
# truncated — the ACC vocabulary documents it as a
# summary string, not a full payload. Detectors
# don't read args today; this is reserved for a
# future `detect_orphaned_tool_call`.
self._acc_observe(
"tool_call",
{"name": tc.name, "args_summary": str(tc.input)[:120]},
severity=1,
round_idx=tool_round,
)
if self._episodic is not None:
self._episodic.log_turn(
self._turn_count + 1,
Expand Down Expand Up @@ -1481,6 +1692,28 @@ async def _stream_and_handle_tools(
result_text = self._apply_error_tracking(
result_text, tc.name, error_streak, resilience_nudged
)
# ACC: tool_result emit. Heuristic success-detection
# from the result text — anton-core does not have a
# structured success/error envelope at this layer,
# so we look for the conventional "Tool 'X' failed"
# prefix that the exception branch above sets, plus
# any handler that prefixed its return with "Error:"
# or the dispatcher's own error-tracking markers.
_failed = (
f"Tool '{tc.name}' failed:" in result_text
or result_text.startswith("Error:")
or "ERROR:" in result_text[:200].upper()
)
self._acc_observe(
"tool_result",
{
"name": tc.name,
"success": not _failed,
"error": result_text[:300] if _failed else "",
},
severity=5 if _failed else 1,
round_idx=tool_round,
)
tool_results.append(
{
"type": "tool_result",
Expand All @@ -1489,6 +1722,12 @@ async def _stream_and_handle_tools(
}
)

# ACC Layer 2 — mid-turn nudge. No-op when mode != "active"
# or when no new patterns fired this round. When it does
# fire, the lesson text appears inline alongside tool_results
# so the LLM sees the alarm before its next decision.
self._acc_maybe_nudge(tool_results)

self._append_history({"role": "user", "content": tool_results})

# Signal that tools are done and LLM is now reasoning
Expand Down
52 changes: 52 additions & 0 deletions anton/core/tools/tool_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,34 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str:
if not name:
return "Scratchpad name is required."

# ACC emit helper: use the session's safe wrapper if it exists,
# otherwise no-op. Defined as a local closure so each emit site
# stays a single line.
def _acc_observe(kind: str, detail: dict, *, severity: int = 1) -> None:
fn = getattr(session, "_acc_observe", None)
if fn is not None:
fn(kind, detail, severity=severity)

if action == "exec":
result = await prepare_scratchpad_exec(session, tc_input)
if isinstance(result, str):
# Empty / malformed code parameter — the dispatcher rejected
# it before reaching the runtime. This is exactly the
# "silent code-clip" failure mode the ACC's
# detect_oversized_cell watches for.
_acc_observe("scratchpad_empty_code", {"name": name}, severity=7)
return result
pad, code, description, estimated_time, estimated_seconds = result

_acc_observe(
"scratchpad_call",
{
"name": name,
"code_len": len(code or ""),
"one_line_description": description or "",
},
)

# Notify pre-execute observers (e.g. cerebellum). The runtime
# never sees these — observation is an orchestration concern,
# so it lives at the dispatcher layer where the data is most
Expand All @@ -325,6 +347,31 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str:
pad_name=name, description=description, cell=cell,
)
await _fire_post_execute(session, cell)
# ACC: distinguish "killed" (timeout/cancel/OOM) from a
# plain runtime error. The local backend sets cell.error
# to a string starting with "Cancelled" or matching the
# "Cell timed out"/"Cell killed" prefixes from the
# asyncio.TimeoutError path. Everything else (NameError,
# ImportError, …) is a regular result with success=False.
err = (cell.error or "").strip()
if err.startswith(("Cancelled", "Cell timed out", "Cell killed")):
_acc_observe(
"scratchpad_killed",
{"name": name, "reason": err[:120]},
severity=6,
)
else:
success = not err and not (cell.stderr or "").strip()
_acc_observe(
"scratchpad_result",
{
"name": name,
"success": success,
"stdout_len": len(cell.stdout or ""),
"error": err[:300] if err else "",
},
severity=5 if not success else 1,
)
return format_cell_result(cell)

elif action == "view":
Expand All @@ -338,6 +385,11 @@ async def handle_scratchpad(session: ChatSession, tc_input: dict) -> str:
if pad is None:
return f"No scratchpad named '{name}'."
await pad.reset()
_acc_observe(
"scratchpad_reset",
{"name": name, "reason": "manual"},
severity=5,
)
return f"Scratchpad '{name}' reset. All state cleared."

elif action == "remove":
Expand Down
Loading
Loading