From b345761d7e3d20e701dca90ba30aba244e910ec1 Mon Sep 17 00:00:00 2001 From: droideronline Date: Mon, 13 Apr 2026 23:05:45 +0530 Subject: [PATCH 1/5] Python: Skip get_final_response in OTel _finalize_stream when stream errored When a streaming error occurs, _finalize_stream (a cleanup hook registered by AgentTelemetryLayer) was unconditionally calling get_final_response(), which triggers all registered result hooks including after_run context providers. This caused providers to fire incorrectly on error paths. Guard against this by checking result_stream._consumed: True only after StopAsyncIteration (normal completion), False when an exception was raised. The fix applies to both the chat client and agent telemetry layers. Closes #5231 --- .../packages/core/agent_framework/observability.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 8d2eb05136..5d5d2755fb 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,6 +1323,12 @@ async def _finalize_stream() -> None: from ._types import ChatResponse try: + if not result_stream._consumed: + # Stream did not complete normally (e.g., it errored). Skip + # get_final_response() to avoid firing result hooks such as + # after_run context providers on error paths. The span is + # still closed in the finally block below. + return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") response_attributes = _get_response_attributes(attributes, response) @@ -1579,6 +1585,12 @@ async def _finalize_stream() -> None: from ._types import AgentResponse try: + if not result_stream._consumed: + # Stream did not complete normally (e.g., it errored). Skip + # get_final_response() to avoid firing result hooks such as + # after_run context providers on error paths. The span is + # still closed in the finally block below. + return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") response_attributes = _get_response_attributes( From 074d41303c5a389493fd1f6c9ec3cd61e7e3bb20 Mon Sep 17 00:00:00 2001 From: droideronline Date: Mon, 13 Apr 2026 23:18:22 +0530 Subject: [PATCH 2/5] Python: Expose consumed/stream_error on ResponseStream and capture error in OTel span Address Copilot review feedback on #5232: - Add `_stream_error: Exception | None` to ResponseStream, set in __anext__'s except branch so cleanup hooks can inspect the failure. - Expose public `consumed` and `stream_error` properties to avoid coupling observability.py to private stream internals. - Update both _finalize_stream closures (chat and agent layers) to use the public properties and call capture_exception() with the stream error before returning early, ensuring the OTel span records the failure rather than closing silently. --- python/packages/core/agent_framework/_types.py | 14 +++++++++++++- .../core/agent_framework/observability.py | 16 ++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 87799d0848..4a05113eb0 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2816,6 +2816,7 @@ def __init__( cleanup_hooks if cleanup_hooks is not None else [] ) self._cleanup_run: bool = False + self._stream_error: Exception | None = None self._inner_stream: ResponseStream[Any, Any] | None = None self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None self._wrap_inner: bool = False @@ -2948,7 +2949,8 @@ async def __anext__(self) -> UpdateT: await self._run_cleanup_hooks() await self.get_final_response() raise - except Exception: + except Exception as exc: + self._stream_error = exc await self._run_cleanup_hooks() raise if self._map_update is not None: @@ -3112,6 +3114,16 @@ async def _run_cleanup_hooks(self) -> None: if isawaitable(result): await result + @property + def consumed(self) -> bool: + """True if the stream completed normally (StopAsyncIteration was reached).""" + return self._consumed + + @property + def stream_error(self) -> Exception | None: + """The exception that caused the stream to fail, or None if it completed normally.""" + return self._stream_error + @property def updates(self) -> Sequence[UpdateT]: return self._updates diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 5d5d2755fb..eafdc332e3 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,11 +1323,13 @@ async def _finalize_stream() -> None: from ._types import ChatResponse try: - if not result_stream._consumed: + if not result_stream.consumed: # Stream did not complete normally (e.g., it errored). Skip # get_final_response() to avoid firing result hooks such as - # after_run context providers on error paths. The span is - # still closed in the finally block below. + # after_run context providers on error paths. Capture the + # stream error on the span so it is not silently swallowed. + if result_stream.stream_error is not None: + capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") @@ -1585,11 +1587,13 @@ async def _finalize_stream() -> None: from ._types import AgentResponse try: - if not result_stream._consumed: + if not result_stream.consumed: # Stream did not complete normally (e.g., it errored). Skip # get_final_response() to avoid firing result hooks such as - # after_run context providers on error paths. The span is - # still closed in the finally block below. + # after_run context providers on error paths. Capture the + # stream error on the span so it is not silently swallowed. + if result_stream.stream_error is not None: + capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") From e59e880064b9883f9322040241986e17f98b775b Mon Sep 17 00:00:00 2001 From: droideronline Date: Mon, 13 Apr 2026 23:28:56 +0530 Subject: [PATCH 3/5] Python: Address Copilot review feedback on stream error handling - Use stream_error is not None as the guard in _finalize_stream instead of not consumed, so the early-return path is keyed precisely to actual errors rather than any non-normal completion state. - Clear _stream_error after _run_cleanup_hooks() completes to avoid retaining the exception traceback (and any large object graphs it references) on the stream instance beyond the cleanup phase. --- .../packages/core/agent_framework/_types.py | 5 +++- .../core/agent_framework/observability.py | 24 ++++++++----------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 4a05113eb0..08a1dd3efc 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2951,7 +2951,10 @@ async def __anext__(self) -> UpdateT: raise except Exception as exc: self._stream_error = exc - await self._run_cleanup_hooks() + try: + await self._run_cleanup_hooks() + finally: + self._stream_error = None raise if self._map_update is not None: update = self._map_update(update) # type: ignore[assignment] diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index eafdc332e3..2bb3af408c 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,13 +1323,11 @@ async def _finalize_stream() -> None: from ._types import ChatResponse try: - if not result_stream.consumed: - # Stream did not complete normally (e.g., it errored). Skip - # get_final_response() to avoid firing result hooks such as - # after_run context providers on error paths. Capture the - # stream error on the span so it is not silently swallowed. - if result_stream.stream_error is not None: - capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) + if result_stream.stream_error is not None: + # Stream errored; skip get_final_response() to avoid firing + # result hooks such as after_run context providers on error + # paths. Capture the error on the span before returning. + capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") @@ -1587,13 +1585,11 @@ async def _finalize_stream() -> None: from ._types import AgentResponse try: - if not result_stream.consumed: - # Stream did not complete normally (e.g., it errored). Skip - # get_final_response() to avoid firing result hooks such as - # after_run context providers on error paths. Capture the - # stream error on the span so it is not silently swallowed. - if result_stream.stream_error is not None: - capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) + if result_stream.stream_error is not None: + # Stream errored; skip get_final_response() to avoid firing + # result hooks such as after_run context providers on error + # paths. Capture the error on the span before returning. + capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") From a450307ccab5ea2a5eb7e1e32ca7cf8dffa14d4d Mon Sep 17 00:00:00 2001 From: droideronline Date: Tue, 14 Apr 2026 13:20:55 +0530 Subject: [PATCH 4/5] Python: Remove consumed/stream_error properties, use private attrs directly Per review feedback: since observability.py and _types.py are in the same package, accessing _stream_error directly is fine and the public properties are unnecessary. --- python/packages/core/agent_framework/_types.py | 10 ---------- python/packages/core/agent_framework/observability.py | 8 ++++---- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 08a1dd3efc..584c1f0110 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -3117,16 +3117,6 @@ async def _run_cleanup_hooks(self) -> None: if isawaitable(result): await result - @property - def consumed(self) -> bool: - """True if the stream completed normally (StopAsyncIteration was reached).""" - return self._consumed - - @property - def stream_error(self) -> Exception | None: - """The exception that caused the stream to fail, or None if it completed normally.""" - return self._stream_error - @property def updates(self) -> Sequence[UpdateT]: return self._updates diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 2bb3af408c..64a0c03414 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,11 +1323,11 @@ async def _finalize_stream() -> None: from ._types import ChatResponse try: - if result_stream.stream_error is not None: + if result_stream._stream_error is not None: # Stream errored; skip get_final_response() to avoid firing # result hooks such as after_run context providers on error # paths. Capture the error on the span before returning. - capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") @@ -1585,11 +1585,11 @@ async def _finalize_stream() -> None: from ._types import AgentResponse try: - if result_stream.stream_error is not None: + if result_stream._stream_error is not None: # Stream errored; skip get_final_response() to avoid firing # result hooks such as after_run context providers on error # paths. Capture the error on the span before returning. - capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns()) + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") From efbb61dba86138d71d7c7f3587c96e9dee0fe574 Mon Sep 17 00:00:00 2001 From: droideronline Date: Tue, 14 Apr 2026 13:35:58 +0530 Subject: [PATCH 5/5] Python: Fix Pyright reportPrivateUsage via inline ignore comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep _stream_error private (consistent with rest of ResponseStream), and suppress reportPrivateUsage at the call sites in observability.py with inline pyright: ignore comments — access is intentional within the package. --- python/packages/core/agent_framework/observability.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 64a0c03414..6998e5994f 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,11 +1323,11 @@ async def _finalize_stream() -> None: from ._types import ChatResponse try: - if result_stream._stream_error is not None: + if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage] # Stream errored; skip get_final_response() to avoid firing # result hooks such as after_run context providers on error # paths. Capture the error on the span before returning. - capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage] return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") @@ -1585,11 +1585,11 @@ async def _finalize_stream() -> None: from ._types import AgentResponse try: - if result_stream._stream_error is not None: + if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage] # Stream errored; skip get_final_response() to avoid firing # result hooks such as after_run context providers on error # paths. Capture the error on the span before returning. - capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage] return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration")