From 01e296c7075aa0960131f3b6d8f12211b515e61f Mon Sep 17 00:00:00 2001 From: Max Abouchar Date: Fri, 15 May 2026 15:32:29 -0700 Subject: [PATCH 1/2] add tracing logic --- anton/core/llm/openai.py | 54 +++++++++++++++++++++++++++++++++++++++ anton/core/llm/tracing.py | 51 ++++++++++++++++++++++++++++++++++++ anton/core/session.py | 46 +++++++++++++++++++++++++++++++-- 3 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 anton/core/llm/tracing.py diff --git a/anton/core/llm/openai.py b/anton/core/llm/openai.py index 1fa46116..b97505e8 100644 --- a/anton/core/llm/openai.py +++ b/anton/core/llm/openai.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import os from collections.abc import AsyncIterator import openai @@ -222,6 +223,24 @@ def __init__( self._ssl_verify = ssl_verify self._api_version = api_version self._supports_vision = supports_vision + # Whether to attach langfuse-style headers (Langfuse-Session-Id, + # Langfuse-Tags, Langfuse-Metadata) to outbound requests. Default-on + # only for the MindsHub-backed deployment, which is the curated + # langfuse-aware router we ship against. For every other openai- + # compatible endpoint (raw OpenAI, Azure, Gemini, self-hosted + # vLLM/ollama/LM Studio) we skip by default so the cowork session + # identity doesn't leak into third-party logs. + # + # Power-user opt-in: set `ANTON_LANGFUSE_HEADERS=1` to force-emit + # the headers regardless of base URL — useful when the user has + # pointed `base_url` at their own langfuse-instrumented proxy. + self._emit_trace_headers = bool(base_url) and ( + "mindshub.ai" in base_url or "mdb.ai" in base_url + ) + if os.environ.get("ANTON_LANGFUSE_HEADERS", "").strip().lower() in { + "1", "true", "yes", "on", + }: + self._emit_trace_headers = True import httpx @@ -255,6 +274,35 @@ def export_connection_info(self) -> ProviderConnectionInfo: api_version=self._api_version, ) + def _build_trace_headers(self) -> dict[str, str] | None: + """Return langfuse-style headers for the active trace, or None. + + Returns None unless trace-header emission is enabled for this + provider instance (default-on for MindsHub, opt-in for any other + openai-compatible endpoint via `ANTON_LANGFUSE_HEADERS=1`) AND a + `TraceContext` has been installed by `ChatSession.turn_stream`. + """ + if not self._emit_trace_headers: + return None + from .tracing import get_trace_context + + ctx = get_trace_context() + if ctx is None: + return None + headers: dict[str, str] = {} + if ctx.session_id: + headers["Langfuse-Session-Id"] = ctx.session_id + if ctx.harness: + headers["Langfuse-Tags"] = ctx.harness + extra: dict[str, object] = {} + if ctx.turn_id is not None: + extra["turn_id"] = ctx.turn_id + if ctx.harness: + extra["harness"] = ctx.harness + if extra: + headers["Langfuse-Metadata"] = json.dumps(extra) + return headers or None + async def complete( self, *, @@ -276,6 +324,9 @@ async def complete( kwargs["tools"] = _translate_tools(tools) if tool_choice: kwargs["tool_choice"] = _translate_tool_choice(tool_choice) + trace_headers = self._build_trace_headers() + if trace_headers: + kwargs["extra_headers"] = trace_headers try: response = await self._client.chat.completions.create(**kwargs) @@ -358,6 +409,9 @@ async def stream( ) if tools: kwargs["tools"] = _translate_tools(tools) + trace_headers = self._build_trace_headers() + if trace_headers: + kwargs["extra_headers"] = trace_headers content_text = "" tool_calls: list[ToolCall] = [] diff --git a/anton/core/llm/tracing.py b/anton/core/llm/tracing.py new file mode 100644 index 00000000..fa85efa6 --- /dev/null +++ b/anton/core/llm/tracing.py @@ -0,0 +1,51 @@ +"""Per-turn trace identity for outbound LLM telemetry. + +`ChatSession.turn_stream` sets the active `TraceContext` for the +duration of a turn. The OpenAI provider reads it when talking to +MindsHub and attaches langfuse-style headers so every LLM call (and +any nested tool/scratchpad LLM call made within the same asyncio +task) is attributed to the same session + turn server-side. + +A `ContextVar` is used so that nested calls — `_stream_and_handle_tools`, +`generate_object` (structured output), the cerebellum's diff call, +and the scratchpad's `coding_provider` calls — all inherit the same +trace automatically without threading kwargs through every layer. + +Scope: only consumed by the OpenAI provider when its base URL points +at MindsHub. Other providers (direct Anthropic, raw OpenAI, Azure, +Gemini) ignore the context entirely. +""" + +from __future__ import annotations + +from contextvars import ContextVar, Token +from dataclasses import dataclass + + +@dataclass(frozen=True) +class TraceContext: + """Identifiers attached to outbound LLM calls during a turn.""" + + session_id: str | None = None + turn_id: int | None = None + harness: str | None = None + + +_trace_ctx: ContextVar[TraceContext | None] = ContextVar( + "anton_trace_ctx", default=None +) + + +def get_trace_context() -> TraceContext | None: + """Return the active trace context, or None if no turn is in flight.""" + return _trace_ctx.get() + + +def set_trace_context(ctx: TraceContext | None) -> Token: + """Install a trace context for the current task; pair with `reset_trace_context`.""" + return _trace_ctx.set(ctx) + + +def reset_trace_context(token: Token) -> None: + """Restore the previous trace context. Pass the token returned by `set_trace_context`.""" + _trace_ctx.reset(token) diff --git a/anton/core/session.py b/anton/core/session.py index d012718f..4ef3713d 100644 --- a/anton/core/session.py +++ b/anton/core/session.py @@ -24,6 +24,11 @@ StreamToolResult, TokenLimitExceeded, ) +from anton.core.llm.tracing import ( + TraceContext, + reset_trace_context, + set_trace_context, +) from anton.core.backends.manager import ScratchpadManager from anton.core.tools.registry import ToolRegistry from anton.core.tools.tool_defs import ( @@ -81,6 +86,11 @@ class ChatSessionConfig: initial_history: list[dict] | None = None history_store: HistoryStore | None = None session_id: str | None = None + # Identifier for the host harness driving this session (e.g. "cowork", + # "cli"). Surfaced on telemetry / langfuse traces so the harness that + # produced a given trace is filterable in the dashboard. None means the + # host didn't identify itself. + harness: str | None = None proactive_dashboards: bool = False tools: list[ToolDef] = field(default_factory=list) @@ -117,6 +127,11 @@ def __init__(self, config: ChatSessionConfig) -> None: ) self._history_store = config.history_store self._session_id = config.session_id + self._harness = config.harness + # Set per-turn by `turn_stream` so any LLM call made during that + # turn can read the current turn identifier (used by telemetry / + # langfuse propagation in the provider layer). + self._current_turn_id: int | None = None self._cancel_event = asyncio.Event() self._escape_watcher: EscapeWatcher | None = None self._active_datasource: str | None = None @@ -1077,9 +1092,20 @@ async def turn(self, user_input: str | list[dict]) -> str: return reply async def turn_stream( - self, user_input: str | list[dict] + self, + user_input: str | list[dict], + *, + turn_id: int | None = None, ) -> AsyncIterator[StreamEvent]: - """Streaming version of turn(). Yields events as they arrive.""" + """Streaming version of turn(). Yields events as they arrive. + + `turn_id` lets the host (cowork, CLI, …) tag the turn with its + own identifier so downstream telemetry can correlate the LLM + calls + tool spans made during this turn. Stored on + `self._current_turn_id` so the provider layer can read it + without threading the arg through every internal call. + """ + self._current_turn_id = turn_id self._append_history({"role": "user", "content": user_input}) # Log user input to episodic memory @@ -1099,6 +1125,21 @@ async def turn_stream( user_message=user_msg_str, ) + # Per-turn trace identity. The OpenAI provider reads this when + # talking to MindsHub and attaches langfuse-style headers so the + # router can attribute every LLM call (and any spans nested + # inside this turn via tools / scratchpad) to the right session. + # ContextVar propagation also covers `asyncio.create_task` spawns + # — the cerebellum flush + identity extraction tasks scheduled + # below inherit a copy of this context. + _trace_token = set_trace_context( + TraceContext( + session_id=self._session_id, + turn_id=turn_id if turn_id is not None else self._turn_count + 1, + harness=self._harness, + ) + ) + try: while True: try: @@ -1174,6 +1215,7 @@ async def turn_stream( self._active_explainability.finalize( "".join(assistant_text_parts)[:2000] ) + reset_trace_context(_trace_token) # Log assistant response to episodic memory if self._episodic is not None and assistant_text_parts: From bc275fbb4fe7966c14c2a491e08fb39da2751631 Mon Sep 17 00:00:00 2001 From: Max Abouchar Date: Fri, 15 May 2026 15:33:02 -0700 Subject: [PATCH 2/2] document how to opt in for tracing in openai-compatible --- README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.md b/README.md index e0deed43..4b4770d4 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,21 @@ ANTON_ANALYTICS_ENABLED=false --- +## Trace headers +When the planning provider is openai-compatible Anton can attach `Langfuse-Session-Id`, `Langfuse-Tags`, and `Langfuse-Metadata` headers so the router can attribute traces. To enable the same headers against any other openai-compatible endpoint (e.g. a self-hosted Langfuse proxy in front of ollama or vLLM), set: + +```bash +export ANTON_LANGFUSE_HEADERS=1 +``` + +Or add it to your workspace config (`.anton/.env`): + +``` +ANTON_LANGFUSE_HEADERS=1 +``` + +--- + ## Dev guidelines We use three long-lived branches: `dev` → `staging` → `main`.