Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/agentevals/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ class WSSessionCompleteEvent(CamelModel):
invocations: list[dict[str, Any]]


class WSSessionRemovedEvent(CamelModel):
type: str = "session_removed"
session_id: str
absorbed_by: str


class WSSpanReceivedEvent(CamelModel):
type: str = "span_received"
session_id: str
Expand Down
18 changes: 18 additions & 0 deletions src/agentevals/api/otlp_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ async def _process_traces(body: dict, manager: StreamingTraceManager) -> None:
resource_attrs = resource_span.get("resource", {}).get("attributes", [])
metadata = _extract_agentevals_metadata(resource_attrs)

if not metadata.get("conversation_id"):
metadata["conversation_id"] = _prescan_conversation_id(resource_span)

for scope_span in resource_span.get("scopeSpans", []):
scope = scope_span.get("scope", {})
scope_name = scope.get("name", "")
Expand Down Expand Up @@ -258,6 +261,21 @@ def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict:
}


def _prescan_conversation_id(resource_span: dict) -> str | None:
"""Pre-scan all spans in a resourceSpan batch for gen_ai.conversation.id.

Within a single OTLP batch, some scopes (e.g. A2A server instrumentation)
may lack conversation_id while others (agent instrumentation) have it.
Scanning upfront ensures ALL spans in the batch route to the same session.
"""
for scope_span in resource_span.get("scopeSpans", []):
for span_data in scope_span.get("spans", []):
conv_id = _extract_conversation_id(span_data.get("attributes", []))
if conv_id:
return conv_id
return None


def _extract_conversation_id(attrs_list: list[dict]) -> str | None:
"""Extract gen_ai.conversation.id from OTLP span attributes."""
for attr in attrs_list:
Expand Down
61 changes: 59 additions & 2 deletions src/agentevals/streaming/ws_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..api.models import (
SessionInfo,
WSSessionCompleteEvent,
WSSessionRemovedEvent,
WSSessionStartedEvent,
WSSpanReceivedEvent,
)
Expand Down Expand Up @@ -227,14 +228,21 @@ async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> Tra
active = self.sessions.get(active_id)
if active and not active.is_complete:
active.trace_ids.add(trace_id)
if conversation_id:
await self._absorb_orphan_for_trace(trace_id, active)
return active
if active and active.is_complete and conversation_id:
self._reopen_session(active, trace_id, session_name)
await self._absorb_orphan_for_trace(trace_id, active)
return active

existing = self.find_session_by_trace_id(trace_id)
if existing and existing.is_complete:
self._reopen_session(existing, trace_id, session_name)
if existing:
if existing.is_complete:
self._reopen_session(existing, trace_id, session_name)
else:
existing.trace_ids.add(trace_id)
self._active_session_for_name[session_name] = existing.session_id
return existing

session_id = session_name
Expand Down Expand Up @@ -349,6 +357,55 @@ def _reopen_session(self, session: TraceSession, trace_id: str, session_name: st
len(session.spans),
)

async def _absorb_orphan_for_trace(self, trace_id: str, target: TraceSession) -> None:
"""Merge an orphan session into the target when conversation_id is discovered.

When infrastructure spans (no conversation_id) arrive before agent spans,
they create a separate session keyed by trace_id. Once the conversation_id
is known and routes to the correct session, the orphan's data is merged
and the orphan session is removed.
"""
orphan = None
orphan_id = None
for sid, session in self.sessions.items():
if sid == target.session_id:
continue
if trace_id in session.trace_ids:
orphan = session
orphan_id = sid
break

if not orphan:
return

target.spans.extend(orphan.spans)
target.logs.extend(orphan.logs)
target.trace_ids.update(orphan.trace_ids)
if orphan.has_root_span:
target.has_root_span = True

del self.sessions[orphan_id]
for name, mapped_id in list(self._active_session_for_name.items()):
if mapped_id == orphan_id:
del self._active_session_for_name[name]
for timer_map in (self._completion_timers, self._idle_timers):
if orphan_id in timer_map:
timer_map.pop(orphan_id).cancel()
self.incremental_extractors.pop(orphan_id, None)

await self.broadcast_to_ui(
WSSessionRemovedEvent(
session_id=orphan_id,
absorbed_by=target.session_id,
).model_dump(by_alias=True)
)
logger.info(
"Absorbed orphan session %s (%d spans) into %s",
orphan_id,
len(orphan.spans),
target.session_id,
)

async def _delayed_complete(self, session_id: str, delay: float) -> None:
await asyncio.sleep(delay)
await self._complete_otlp_session(session_id)
Expand Down
11 changes: 11 additions & 0 deletions ui/src/components/streaming/LiveStreamingView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ export function LiveStreamingView() {
});
break;

case 'session_removed':
if (import.meta.env.DEV) {
console.log('[Streaming] Session removed:', data.sessionId, 'absorbed by:', data.absorbedBy);
}
setActiveSessions(prev => {
const newMap = new Map(prev);
newMap.delete(data.sessionId);
return newMap;
});
break;

case 'session_complete':
if (import.meta.env.DEV) {
console.log('[Streaming] Session complete with invocations:', data.invocations?.length);
Expand Down