diff --git a/src/agentevals/api/models.py b/src/agentevals/api/models.py index 1acc0ee..4b3aebc 100644 --- a/src/agentevals/api/models.py +++ b/src/agentevals/api/models.py @@ -180,6 +180,12 @@ class WSSessionCompleteEvent(CamelModel): invocations: list[dict[str, Any]] +class WSSessionRemovedEvent(CamelModel): + type: str = "session_removed" + session_id: str + absorbed_by: str + + class WSSpanReceivedEvent(CamelModel): type: str = "span_received" session_id: str diff --git a/src/agentevals/api/otlp_routes.py b/src/agentevals/api/otlp_routes.py index af99873..f4dba27 100644 --- a/src/agentevals/api/otlp_routes.py +++ b/src/agentevals/api/otlp_routes.py @@ -96,6 +96,9 @@ async def _process_traces(body: dict, manager: StreamingTraceManager) -> None: resource_attrs = resource_span.get("resource", {}).get("attributes", []) metadata = _extract_agentevals_metadata(resource_attrs) + if not metadata.get("conversation_id"): + metadata["conversation_id"] = _prescan_conversation_id(resource_span) + for scope_span in resource_span.get("scopeSpans", []): scope = scope_span.get("scope", {}) scope_name = scope.get("name", "") @@ -258,6 +261,21 @@ def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict: } +def _prescan_conversation_id(resource_span: dict) -> str | None: + """Pre-scan all spans in a resourceSpan batch for gen_ai.conversation.id. + + Within a single OTLP batch, some scopes (e.g. A2A server instrumentation) + may lack conversation_id while others (agent instrumentation) have it. + Scanning upfront ensures ALL spans in the batch route to the same session. + """ + for scope_span in resource_span.get("scopeSpans", []): + for span_data in scope_span.get("spans", []): + conv_id = _extract_conversation_id(span_data.get("attributes", [])) + if conv_id: + return conv_id + return None + + def _extract_conversation_id(attrs_list: list[dict]) -> str | None: """Extract gen_ai.conversation.id from OTLP span attributes.""" for attr in attrs_list: diff --git a/src/agentevals/streaming/ws_server.py b/src/agentevals/streaming/ws_server.py index 45790b8..5b90b86 100644 --- a/src/agentevals/streaming/ws_server.py +++ b/src/agentevals/streaming/ws_server.py @@ -15,6 +15,7 @@ from ..api.models import ( SessionInfo, WSSessionCompleteEvent, + WSSessionRemovedEvent, WSSessionStartedEvent, WSSpanReceivedEvent, ) @@ -227,14 +228,21 @@ async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> Tra active = self.sessions.get(active_id) if active and not active.is_complete: active.trace_ids.add(trace_id) + if conversation_id: + await self._absorb_orphan_for_trace(trace_id, active) return active if active and active.is_complete and conversation_id: self._reopen_session(active, trace_id, session_name) + await self._absorb_orphan_for_trace(trace_id, active) return active existing = self.find_session_by_trace_id(trace_id) - if existing and existing.is_complete: - self._reopen_session(existing, trace_id, session_name) + if existing: + if existing.is_complete: + self._reopen_session(existing, trace_id, session_name) + else: + existing.trace_ids.add(trace_id) + self._active_session_for_name[session_name] = existing.session_id return existing session_id = session_name @@ -349,6 +357,55 @@ def _reopen_session(self, session: TraceSession, trace_id: str, session_name: st len(session.spans), ) + async def _absorb_orphan_for_trace(self, trace_id: str, target: TraceSession) -> None: + """Merge an orphan session into the target when conversation_id is discovered. + + When infrastructure spans (no conversation_id) arrive before agent spans, + they create a separate session keyed by trace_id. Once the conversation_id + is known and routes to the correct session, the orphan's data is merged + and the orphan session is removed. + """ + orphan = None + orphan_id = None + for sid, session in self.sessions.items(): + if sid == target.session_id: + continue + if trace_id in session.trace_ids: + orphan = session + orphan_id = sid + break + + if not orphan: + return + + target.spans.extend(orphan.spans) + target.logs.extend(orphan.logs) + target.trace_ids.update(orphan.trace_ids) + if orphan.has_root_span: + target.has_root_span = True + + del self.sessions[orphan_id] + for name, mapped_id in list(self._active_session_for_name.items()): + if mapped_id == orphan_id: + del self._active_session_for_name[name] + for timer_map in (self._completion_timers, self._idle_timers): + if orphan_id in timer_map: + timer_map.pop(orphan_id).cancel() + self.incremental_extractors.pop(orphan_id, None) + + await self.broadcast_to_ui( + WSSessionRemovedEvent( + session_id=orphan_id, + absorbed_by=target.session_id, + ).model_dump(by_alias=True) + ) + logger.info( + "Absorbed orphan session %s (%d spans) into %s", + orphan_id, + len(orphan.spans), + target.session_id, + ) + async def _delayed_complete(self, session_id: str, delay: float) -> None: await asyncio.sleep(delay) await self._complete_otlp_session(session_id) diff --git a/ui/src/components/streaming/LiveStreamingView.tsx b/ui/src/components/streaming/LiveStreamingView.tsx index 2024b33..85d229b 100644 --- a/ui/src/components/streaming/LiveStreamingView.tsx +++ b/ui/src/components/streaming/LiveStreamingView.tsx @@ -318,6 +318,17 @@ export function LiveStreamingView() { }); break; + case 'session_removed': + if (import.meta.env.DEV) { + console.log('[Streaming] Session removed:', data.sessionId, 'absorbed by:', data.absorbedBy); + } + setActiveSessions(prev => { + const newMap = new Map(prev); + newMap.delete(data.sessionId); + return newMap; + }); + break; + case 'session_complete': if (import.meta.env.DEV) { console.log('[Streaming] Session complete with invocations:', data.invocations?.length);