Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ ANTON_ANALYTICS_ENABLED=false

---

## Trace headers
When the planning provider is openai-compatible Anton can attach `Langfuse-Session-Id`, `Langfuse-Tags`, and `Langfuse-Metadata` headers so the router can attribute traces. To enable the same headers against any other openai-compatible endpoint (e.g. a self-hosted Langfuse proxy in front of ollama or vLLM), set:

```bash
export ANTON_LANGFUSE_HEADERS=1
```

Or add it to your workspace config (`.anton/.env`):

```
ANTON_LANGFUSE_HEADERS=1
```

---

## Dev guidelines

We use three long-lived branches: `dev` → `staging` → `main`.
Expand Down
54 changes: 54 additions & 0 deletions anton/core/llm/openai.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import os
from collections.abc import AsyncIterator

import openai
Expand Down Expand Up @@ -222,6 +223,24 @@ def __init__(
self._ssl_verify = ssl_verify
self._api_version = api_version
self._supports_vision = supports_vision
# Whether to attach langfuse-style headers (Langfuse-Session-Id,
# Langfuse-Tags, Langfuse-Metadata) to outbound requests. Default-on
# only for the MindsHub-backed deployment, which is the curated
# langfuse-aware router we ship against. For every other openai-
# compatible endpoint (raw OpenAI, Azure, Gemini, self-hosted
# vLLM/ollama/LM Studio) we skip by default so the cowork session
# identity doesn't leak into third-party logs.
#
# Power-user opt-in: set `ANTON_LANGFUSE_HEADERS=1` to force-emit
# the headers regardless of base URL — useful when the user has
# pointed `base_url` at their own langfuse-instrumented proxy.
self._emit_trace_headers = bool(base_url) and (
"mindshub.ai" in base_url or "mdb.ai" in base_url
)
if os.environ.get("ANTON_LANGFUSE_HEADERS", "").strip().lower() in {
"1", "true", "yes", "on",
}:
self._emit_trace_headers = True

import httpx

Expand Down Expand Up @@ -255,6 +274,35 @@ def export_connection_info(self) -> ProviderConnectionInfo:
api_version=self._api_version,
)

def _build_trace_headers(self) -> dict[str, str] | None:
"""Return langfuse-style headers for the active trace, or None.

Returns None unless trace-header emission is enabled for this
provider instance (default-on for MindsHub, opt-in for any other
openai-compatible endpoint via `ANTON_LANGFUSE_HEADERS=1`) AND a
`TraceContext` has been installed by `ChatSession.turn_stream`.
"""
if not self._emit_trace_headers:
return None
from .tracing import get_trace_context

ctx = get_trace_context()
if ctx is None:
return None
headers: dict[str, str] = {}
if ctx.session_id:
headers["Langfuse-Session-Id"] = ctx.session_id
if ctx.harness:
headers["Langfuse-Tags"] = ctx.harness
extra: dict[str, object] = {}
if ctx.turn_id is not None:
extra["turn_id"] = ctx.turn_id
if ctx.harness:
extra["harness"] = ctx.harness
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this set twice, its already in Langfuse-tags?

if extra:
headers["Langfuse-Metadata"] = json.dumps(extra)
return headers or None

async def complete(
self,
*,
Expand All @@ -276,6 +324,9 @@ async def complete(
kwargs["tools"] = _translate_tools(tools)
if tool_choice:
kwargs["tool_choice"] = _translate_tool_choice(tool_choice)
trace_headers = self._build_trace_headers()
if trace_headers:
kwargs["extra_headers"] = trace_headers

try:
response = await self._client.chat.completions.create(**kwargs)
Expand Down Expand Up @@ -358,6 +409,9 @@ async def stream(
)
if tools:
kwargs["tools"] = _translate_tools(tools)
trace_headers = self._build_trace_headers()
if trace_headers:
kwargs["extra_headers"] = trace_headers

content_text = ""
tool_calls: list[ToolCall] = []
Expand Down
51 changes: 51 additions & 0 deletions anton/core/llm/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Per-turn trace identity for outbound LLM telemetry.

`ChatSession.turn_stream` sets the active `TraceContext` for the
duration of a turn. The OpenAI provider reads it when talking to
MindsHub and attaches langfuse-style headers so every LLM call (and
any nested tool/scratchpad LLM call made within the same asyncio
task) is attributed to the same session + turn server-side.

A `ContextVar` is used so that nested calls — `_stream_and_handle_tools`,
`generate_object` (structured output), the cerebellum's diff call,
and the scratchpad's `coding_provider` calls — all inherit the same
trace automatically without threading kwargs through every layer.

Scope: only consumed by the OpenAI provider when its base URL points
at MindsHub. Other providers (direct Anthropic, raw OpenAI, Azure,
Gemini) ignore the context entirely.
"""

from __future__ import annotations

from contextvars import ContextVar, Token
from dataclasses import dataclass


@dataclass(frozen=True)
class TraceContext:
"""Identifiers attached to outbound LLM calls during a turn."""

session_id: str | None = None
turn_id: int | None = None
harness: str | None = None


_trace_ctx: ContextVar[TraceContext | None] = ContextVar(
"anton_trace_ctx", default=None
)


def get_trace_context() -> TraceContext | None:
"""Return the active trace context, or None if no turn is in flight."""
return _trace_ctx.get()


def set_trace_context(ctx: TraceContext | None) -> Token:
"""Install a trace context for the current task; pair with `reset_trace_context`."""
return _trace_ctx.set(ctx)


def reset_trace_context(token: Token) -> None:
"""Restore the previous trace context. Pass the token returned by `set_trace_context`."""
_trace_ctx.reset(token)
46 changes: 44 additions & 2 deletions anton/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
StreamToolResult,
TokenLimitExceeded,
)
from anton.core.llm.tracing import (
TraceContext,
reset_trace_context,
set_trace_context,
)
from anton.core.backends.manager import ScratchpadManager
from anton.core.tools.registry import ToolRegistry
from anton.core.tools.tool_defs import (
Expand Down Expand Up @@ -81,6 +86,11 @@ class ChatSessionConfig:
initial_history: list[dict] | None = None
history_store: HistoryStore | None = None
session_id: str | None = None
# Identifier for the host harness driving this session (e.g. "cowork",
# "cli"). Surfaced on telemetry / langfuse traces so the harness that
# produced a given trace is filterable in the dashboard. None means the
# host didn't identify itself.
harness: str | None = None
proactive_dashboards: bool = False
tools: list[ToolDef] = field(default_factory=list)

Expand Down Expand Up @@ -117,6 +127,11 @@ def __init__(self, config: ChatSessionConfig) -> None:
)
self._history_store = config.history_store
self._session_id = config.session_id
self._harness = config.harness
# Set per-turn by `turn_stream` so any LLM call made during that
# turn can read the current turn identifier (used by telemetry /
# langfuse propagation in the provider layer).
self._current_turn_id: int | None = None
self._cancel_event = asyncio.Event()
self._escape_watcher: EscapeWatcher | None = None
self._active_datasource: str | None = None
Expand Down Expand Up @@ -1077,9 +1092,20 @@ async def turn(self, user_input: str | list[dict]) -> str:
return reply

async def turn_stream(
self, user_input: str | list[dict]
self,
user_input: str | list[dict],
*,
turn_id: int | None = None,
) -> AsyncIterator[StreamEvent]:
"""Streaming version of turn(). Yields events as they arrive."""
"""Streaming version of turn(). Yields events as they arrive.

`turn_id` lets the host (cowork, CLI, …) tag the turn with its
own identifier so downstream telemetry can correlate the LLM
calls + tool spans made during this turn. Stored on
`self._current_turn_id` so the provider layer can read it
without threading the arg through every internal call.
"""
self._current_turn_id = turn_id
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used somwhere? if not we can remove it

self._append_history({"role": "user", "content": user_input})

# Log user input to episodic memory
Expand All @@ -1099,6 +1125,21 @@ async def turn_stream(
user_message=user_msg_str,
)

# Per-turn trace identity. The OpenAI provider reads this when
# talking to MindsHub and attaches langfuse-style headers so the
# router can attribute every LLM call (and any spans nested
# inside this turn via tools / scratchpad) to the right session.
# ContextVar propagation also covers `asyncio.create_task` spawns
# — the cerebellum flush + identity extraction tasks scheduled
# below inherit a copy of this context.
_trace_token = set_trace_context(
TraceContext(
session_id=self._session_id,
turn_id=turn_id if turn_id is not None else self._turn_count + 1,
harness=self._harness,
)
)

try:
while True:
try:
Expand Down Expand Up @@ -1174,6 +1215,7 @@ async def turn_stream(
self._active_explainability.finalize(
"".join(assistant_text_parts)[:2000]
)
reset_trace_context(_trace_token)

# Log assistant response to episodic memory
if self._episodic is not None and assistant_text_parts:
Expand Down
Loading