Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions python/packages/core/agent_framework/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import base64
import contextlib
import json
import logging
import re
Expand Down Expand Up @@ -2890,6 +2891,7 @@ def __init__(
self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None
self._wrap_inner: bool = False
self._map_update: Callable[[Any], UpdateT | Awaitable[UpdateT]] | None = None
self._pull_context_manager_factories: list[Callable[[], contextlib.AbstractContextManager[Any]]] = []

def map(
self,
Expand Down Expand Up @@ -3008,11 +3010,18 @@ def __aiter__(self) -> ResponseStream[UpdateT, FinalT]:
return self

async def __anext__(self) -> UpdateT:
if self._iterator is None:
stream = await self._get_stream()
self._iterator = stream.__aiter__()
try:
update: UpdateT = await self._iterator.__anext__()
with contextlib.ExitStack() as stack:
for factory in self._pull_context_manager_factories:
stack.enter_context(factory())
# Resolve the underlying stream inside the pull contexts so that any
# spans/contexts created during stream resolution (e.g. inner chat
# completion spans created on the first pull of a wrapped agent stream)
# inherit the active context (e.g. an outer agent invoke span).
if self._iterator is None:
stream = await self._get_stream()
self._iterator = stream.__aiter__()
update: UpdateT = await self._iterator.__anext__()
except StopAsyncIteration:
self._consumed = True
await self._run_cleanup_hooks()
Expand All @@ -3038,9 +3047,25 @@ async def __anext__(self) -> UpdateT:
update = hooked
return update

async def _resolve_stream_with_pull_contexts(self) -> AsyncIterable[UpdateT]:
"""Resolve the underlying stream while activating any registered pull context managers.

Used by ``__await__`` and ``get_final_response`` so that any spans/contexts created
during stream resolution (e.g. when the source is an Awaitable that internally
creates child telemetry spans) inherit the same active context as iterator pulls.
``__anext__`` resolves the stream inside its own ExitStack and so calls ``_get_stream``
directly.
"""
if self._stream is not None:
return await self._get_stream()
with contextlib.ExitStack() as stack:
for factory in self._pull_context_manager_factories:
stack.enter_context(factory())
return await self._get_stream()

def __await__(self) -> Any:
async def _wrap() -> ResponseStream[UpdateT, FinalT]:
await self._get_stream()
await self._resolve_stream_with_pull_contexts()
return self

return _wrap().__await__()
Expand All @@ -3064,10 +3089,12 @@ async def get_final_response(self) -> FinalT:
"""
if self._wrap_inner:
if self._inner_stream is None:
# Use _get_stream() to resolve the awaitable - this properly handles
# Use _resolve_stream_with_pull_contexts() so that any spans/contexts
# created while resolving the awaitable (e.g. inner telemetry spans)
# inherit the same active context as iterator pulls. This also handles
# the case where _stream_source and _inner_stream_source are the same
# coroutine (e.g., from from_awaitable), avoiding double-await errors.
await self._get_stream()
await self._resolve_stream_with_pull_contexts()
if self._inner_stream is None:
raise RuntimeError("Inner stream not available")
if not self._finalized and not self._consumed:
Expand Down Expand Up @@ -3177,6 +3204,25 @@ def with_cleanup_hook(
self._cleanup_hooks.append(hook)
return self

def with_pull_context_manager(
self,
cm_factory: Callable[[], contextlib.AbstractContextManager[Any]],
) -> ResponseStream[UpdateT, FinalT]:
"""Register a context manager factory invoked around each underlying iterator pull.

The factory is called once per ``__anext__`` and the returned context manager wraps
the await of the underlying iterator. This is useful for state that needs to be
active while the inner async work runs - for example, attaching an OpenTelemetry
span to the current context so child spans created by inner code (HTTP clients,
tool execution) are correctly parented.

Because the context manager is entered and exited within the same ``__anext__``
invocation, attach/detach style operations remain symmetric in the same async
context regardless of where the stream is iterated.
"""
self._pull_context_manager_factories.append(cm_factory)
Comment thread
TaoChenOSU marked this conversation as resolved.
return self

async def _run_cleanup_hooks(self) -> None:
if self._cleanup_run:
return
Expand Down
148 changes: 104 additions & 44 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Final, Generic, Literal, TypedDict, cast, overload

from dotenv import load_dotenv
from opentelemetry import context as otel_context
from opentelemetry import metrics, trace

from . import __version__ as version_info
Expand Down Expand Up @@ -1277,27 +1278,8 @@ def get_response(
)

if stream:
result_stream = cast(
ResponseStream[ChatResponseUpdate, ChatResponse[Any]],
super_get_response(
messages=messages,
stream=True,
options=opts,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=merged_client_kwargs,
),
)
span = _start_streaming_span(attributes, OtelAttr.REQUEST_MODEL)

# Create span directly without trace.use_span() context attachment.
# Streaming spans are closed asynchronously in cleanup hooks, which run
# in a different async context than creation — using use_span() would
# cause "Failed to detach context" errors from OpenTelemetry.
operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(OtelAttr.REQUEST_MODEL, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
Expand All @@ -1319,6 +1301,24 @@ def _close_span() -> None:
def _record_duration() -> None:
duration_state["duration"] = perf_counter() - start_time

try:
result_stream = cast(
ResponseStream[ChatResponseUpdate, ChatResponse[Any]],
Comment thread
moonbox3 marked this conversation as resolved.
super_get_response(
messages=messages,
stream=True,
options=opts,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=merged_client_kwargs,
),
)
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
_close_span()
raise

async def _finalize_stream() -> None:
from ._types import ChatResponse

Expand Down Expand Up @@ -1357,11 +1357,18 @@ async def _finalize_stream() -> None:
finally:
_close_span()

# Register a weak reference callback to close the span if stream is garbage collected
# without being consumed. This ensures spans don't leak if users don't consume streams.
wrapped_stream: ResponseStream[ChatResponseUpdate, ChatResponse[Any]] = result_stream.with_cleanup_hook(
_record_duration
).with_cleanup_hook(_finalize_stream)
# The pull context manager attaches the span around each underlying iterator pull so
# that child spans created during the pull (e.g. HTTP requests, inner tool execution)
# are parented under this chat span. Attach and detach happen in the same async
# context as the pull, avoiding cross-context cleanup issues. The weakref finalizer
# ensures the span is closed even if the stream is garbage collected without being
# consumed.
wrapped_stream: ResponseStream[ChatResponseUpdate, ChatResponse[Any]] = (
result_stream
.with_cleanup_hook(_record_duration)
.with_cleanup_hook(_finalize_stream)
.with_pull_context_manager(lambda: _activate_span(span))
)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

Expand Down Expand Up @@ -1543,23 +1550,8 @@ def _trace_agent_invocation(
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})

if stream:
try:
run_result: object = execute()
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
raise
span = _start_streaming_span(attributes, OtelAttr.AGENT_NAME)

operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(OtelAttr.AGENT_NAME, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
Expand All @@ -1581,6 +1573,21 @@ def _close_span() -> None:
def _record_duration() -> None:
duration_state["duration"] = perf_counter() - start_time
Comment thread
moonbox3 marked this conversation as resolved.

try:
run_result: object = execute()
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()
raise
Comment thread
TaoChenOSU marked this conversation as resolved.

async def _finalize_stream() -> None:
from ._types import AgentResponse

Expand Down Expand Up @@ -1620,9 +1627,18 @@ async def _finalize_stream() -> None:
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()

wrapped_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = result_stream.with_cleanup_hook(
_record_duration
).with_cleanup_hook(_finalize_stream)
# The pull context manager attaches the span around each underlying iterator pull so
# that child spans created during the pull (e.g. inner chat completion spans from the
# underlying ChatTelemetryLayer) are parented under this agent invoke span. Attach and
# detach happen in the same async context as the pull, avoiding cross-context cleanup
# issues. The weakref finalizer ensures the span is closed even if the stream is
# garbage collected without being consumed.
wrapped_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = (
result_stream
.with_cleanup_hook(_record_duration)
.with_cleanup_hook(_finalize_stream)
.with_pull_context_manager(lambda: _activate_span(span))
)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

Expand Down Expand Up @@ -1809,6 +1825,27 @@ def get_function_span(
)


@contextlib.contextmanager
def _activate_span(span: trace.Span) -> Generator[None]:
"""Attach ``span`` as the current span in the OpenTelemetry context.

Designed to be used as a per-pull context manager registered on a
``ResponseStream`` via ``with_pull_context_manager``: it attaches the span
before each underlying iterator pull and detaches immediately after, so
child spans created during the pull (HTTP clients, inner chat completions,
tool execution) are correctly parented under ``span``.

Because attach and detach happen within the same ``__anext__`` invocation
(and therefore the same async task / contextvars context), there is no risk
of "Failed to detach context" warnings from cross-context cleanup.
"""
token = otel_context.attach(trace.set_span_in_context(span))
try:
yield
finally:
otel_context.detach(token)


@contextlib.contextmanager
def _get_span(
attributes: dict[str, Any],
Expand All @@ -1831,6 +1868,29 @@ def _get_span(
yield current_span


def _start_streaming_span(attributes: dict[str, Any], span_name_attribute: str) -> trace.Span:
"""Start a non-current span for a streaming operation.

Unlike :func:`_get_span`, the returned span is not attached to the current
OpenTelemetry context. The caller is responsible for:

- Ending the span via cleanup hooks on the wrapped
:class:`~agent_framework._types.ResponseStream`.
- Activating the span around each iterator pull via
:func:`_activate_span` registered with ``with_pull_context_manager`` so
that child spans created during stream production inherit it as parent.

Streaming spans are closed asynchronously in cleanup hooks that run in a
different async context than creation, so attaching the span at creation
time would cause "Failed to detach context" errors from OpenTelemetry.
"""
operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(span_name_attribute, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)
return span


def _get_instructions_from_options(options: Any) -> str | list[str] | None:
"""Extract instructions from options dict."""
if options is None:
Expand Down
Loading
Loading