From 77088e84236394413723354d768a60bfe43d32f3 Mon Sep 17 00:00:00 2001 From: krisztianfekete Date: Tue, 31 Mar 2026 18:02:30 +0200 Subject: [PATCH] improve session groupping --- src/agentevals/api/otlp_routes.py | 14 ++++++++++++++ src/agentevals/streaming/ws_server.py | 12 +++++++++--- src/agentevals/trace_attrs.py | 1 + 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/agentevals/api/otlp_routes.py b/src/agentevals/api/otlp_routes.py index 5298aaa..af99873 100644 --- a/src/agentevals/api/otlp_routes.py +++ b/src/agentevals/api/otlp_routes.py @@ -26,6 +26,7 @@ from ..extraction import flatten_otlp_attributes from ..trace_attrs import ( + OTEL_GENAI_CONVERSATION_ID, OTEL_GENAI_INPUT_MESSAGES, OTEL_GENAI_OUTPUT_MESSAGES, OTEL_SCOPE, @@ -107,6 +108,11 @@ async def _process_traces(body: dict, manager: StreamingTraceManager) -> None: if not trace_id: continue + if not metadata.get("conversation_id"): + conversation_id = _extract_conversation_id(span.get("attributes", [])) + if conversation_id: + metadata["conversation_id"] = conversation_id + session = await manager.get_or_create_otlp_session(trace_id, metadata) if not session.can_accept_span(): @@ -252,6 +258,14 @@ def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict: } +def _extract_conversation_id(attrs_list: list[dict]) -> str | None: + """Extract gen_ai.conversation.id from OTLP span attributes.""" + for attr in attrs_list: + if attr.get("key") == OTEL_GENAI_CONVERSATION_ID: + return attr.get("value", {}).get("stringValue") + return None + + def _convert_otlp_log_record(log_record: dict) -> dict | None: """Convert OTLP log record to internal log event format. diff --git a/src/agentevals/streaming/ws_server.py b/src/agentevals/streaming/ws_server.py index 187dab6..45790b8 100644 --- a/src/agentevals/streaming/ws_server.py +++ b/src/agentevals/streaming/ws_server.py @@ -212,12 +212,15 @@ def _replay_orphan_logs(self, session: TraceSession) -> list[dict]: async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> TraceSession: """Get existing session for trace_id or create a new one (OTLP path). - Groups spans by session_name (from resource attributes), not by trace_id. + Groups spans by session_name (from resource attributes) or + gen_ai.conversation.id (OTel semconv), not by trace_id. A single session can contain spans from multiple traces — this is common with GenAI semconv instrumentation where each LLM call creates its own - independent trace. + independent trace, and with multi-turn agent conversations where each + turn produces a separate trace sharing the same conversation ID. """ - session_name = metadata.get("session_name") or f"otlp-{trace_id[:12]}" + conversation_id = metadata.get("conversation_id") + session_name = metadata.get("session_name") or conversation_id or f"otlp-{trace_id[:12]}" active_id = self._active_session_for_name.get(session_name) if active_id: @@ -225,6 +228,9 @@ async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> Tra if active and not active.is_complete: active.trace_ids.add(trace_id) return active + if active and active.is_complete and conversation_id: + self._reopen_session(active, trace_id, session_name) + return active existing = self.find_session_by_trace_id(trace_id) if existing and existing.is_complete: diff --git a/src/agentevals/trace_attrs.py b/src/agentevals/trace_attrs.py index 8c6c775..37ea351 100644 --- a/src/agentevals/trace_attrs.py +++ b/src/agentevals/trace_attrs.py @@ -23,6 +23,7 @@ OTEL_GENAI_TOOL_CALL_ID = "gen_ai.tool.call.id" OTEL_GENAI_TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" OTEL_GENAI_TOOL_CALL_RESULT = "gen_ai.tool.call.result" +OTEL_GENAI_CONVERSATION_ID = "gen_ai.conversation.id" # ADK-specific custom attributes (gcp.vertex.agent.*) ADK_LLM_REQUEST = "gcp.vertex.agent.llm_request"