Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/agentevals/api/otlp_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from ..extraction import flatten_otlp_attributes
from ..trace_attrs import (
OTEL_GENAI_CONVERSATION_ID,
OTEL_GENAI_INPUT_MESSAGES,
OTEL_GENAI_OUTPUT_MESSAGES,
OTEL_SCOPE,
Expand Down Expand Up @@ -107,6 +108,11 @@ async def _process_traces(body: dict, manager: StreamingTraceManager) -> None:
if not trace_id:
continue

if not metadata.get("conversation_id"):
conversation_id = _extract_conversation_id(span.get("attributes", []))
if conversation_id:
metadata["conversation_id"] = conversation_id

session = await manager.get_or_create_otlp_session(trace_id, metadata)

if not session.can_accept_span():
Expand Down Expand Up @@ -252,6 +258,14 @@ def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict:
}


def _extract_conversation_id(attrs_list: list[dict]) -> str | None:
"""Extract gen_ai.conversation.id from OTLP span attributes."""
for attr in attrs_list:
if attr.get("key") == OTEL_GENAI_CONVERSATION_ID:
return attr.get("value", {}).get("stringValue")
return None


def _convert_otlp_log_record(log_record: dict) -> dict | None:
"""Convert OTLP log record to internal log event format.

Expand Down
12 changes: 9 additions & 3 deletions src/agentevals/streaming/ws_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,25 @@ def _replay_orphan_logs(self, session: TraceSession) -> list[dict]:
async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> TraceSession:
"""Get existing session for trace_id or create a new one (OTLP path).

Groups spans by session_name (from resource attributes), not by trace_id.
Groups spans by session_name (from resource attributes) or
gen_ai.conversation.id (OTel semconv), not by trace_id.
A single session can contain spans from multiple traces — this is common
with GenAI semconv instrumentation where each LLM call creates its own
independent trace.
independent trace, and with multi-turn agent conversations where each
turn produces a separate trace sharing the same conversation ID.
"""
session_name = metadata.get("session_name") or f"otlp-{trace_id[:12]}"
conversation_id = metadata.get("conversation_id")
session_name = metadata.get("session_name") or conversation_id or f"otlp-{trace_id[:12]}"

active_id = self._active_session_for_name.get(session_name)
if active_id:
active = self.sessions.get(active_id)
if active and not active.is_complete:
active.trace_ids.add(trace_id)
return active
if active and active.is_complete and conversation_id:
self._reopen_session(active, trace_id, session_name)
return active

existing = self.find_session_by_trace_id(trace_id)
if existing and existing.is_complete:
Expand Down
1 change: 1 addition & 0 deletions src/agentevals/trace_attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
OTEL_GENAI_TOOL_CALL_ID = "gen_ai.tool.call.id"
OTEL_GENAI_TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments"
OTEL_GENAI_TOOL_CALL_RESULT = "gen_ai.tool.call.result"
OTEL_GENAI_CONVERSATION_ID = "gen_ai.conversation.id"

# ADK-specific custom attributes (gcp.vertex.agent.*)
ADK_LLM_REQUEST = "gcp.vertex.agent.llm_request"
Expand Down