-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Python: Emit AG-UI events for MCP tool calls, results, and text reasoning #4233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
47c6ab5
4773a90
4060d76
92c48b3
c76f966
8122740
8fc4bdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,11 @@ | |
| from ag_ui.core import ( | ||
| BaseEvent, | ||
| CustomEvent, | ||
| ReasoningEndEvent, | ||
| ReasoningMessageContentEvent, | ||
| ReasoningMessageEndEvent, | ||
| ReasoningMessageStartEvent, | ||
| ReasoningStartEvent, | ||
| RunFinishedEvent, | ||
| StateSnapshotEvent, | ||
| TextMessageContentEvent, | ||
|
|
@@ -355,6 +360,142 @@ def _emit_usage(content: Content) -> list[BaseEvent]: | |
| return [CustomEvent(name="usage", value=usage_details)] | ||
|
|
||
|
|
||
| def _emit_mcp_tool_call(content: Content, flow: FlowState) -> list[BaseEvent]: | ||
| """Emit ToolCall start/args events for MCP server tool call content. | ||
|
|
||
| MCP tool calls arrive as complete items (not streamed deltas), so we emit a | ||
| ``ToolCallStartEvent`` (and, when arguments are present, a ``ToolCallArgsEvent``) | ||
| immediately. This maps MCP-specific fields (tool_name, server_name) to the | ||
| same AG-UI ToolCall* events used by regular function calls, making MCP tool | ||
| execution visible to AG-UI consumers. Completion/end events are handled | ||
| separately by ``_emit_mcp_tool_result``. | ||
| """ | ||
| events: list[BaseEvent] = [] | ||
|
|
||
| tool_call_id = content.call_id or generate_event_id() | ||
| tool_name = content.tool_name or "mcp_tool" | ||
|
|
||
| # Prefix with server name for disambiguation when available | ||
| display_name = f"{content.server_name}/{tool_name}" if content.server_name else tool_name | ||
|
|
||
| events.append( | ||
| ToolCallStartEvent( | ||
| tool_call_id=tool_call_id, | ||
| tool_call_name=display_name, | ||
| parent_message_id=flow.message_id, | ||
| ) | ||
| ) | ||
|
|
||
| # Serialize arguments | ||
| args_str = "" | ||
| if content.arguments: | ||
| args_str = ( | ||
| content.arguments | ||
| if isinstance(content.arguments, str) | ||
| else json.dumps(make_json_safe(content.arguments)) | ||
| ) | ||
| events.append(ToolCallArgsEvent(tool_call_id=tool_call_id, delta=args_str)) | ||
|
|
||
| # Track in flow state for MESSAGES_SNAPSHOT | ||
| tool_entry = { | ||
| "id": tool_call_id, | ||
| "type": "function", | ||
| "function": {"name": display_name, "arguments": args_str}, | ||
| } | ||
| flow.pending_tool_calls.append(tool_entry) | ||
| flow.tool_calls_by_id[tool_call_id] = tool_entry | ||
|
|
||
| return events | ||
|
|
||
|
|
||
| def _emit_mcp_tool_result(content: Content, flow: FlowState) -> list[BaseEvent]: | ||
| """Emit ToolCallResult events for MCP server tool result content. | ||
|
|
||
| Maps MCP tool results to the same AG-UI ToolCallEnd + ToolCallResult events | ||
| used by regular function results. Uses ``content.output`` (the MCP-specific | ||
| result field) instead of ``content.result``. | ||
|
|
||
| Mirrors the FlowState cleanup performed by ``_emit_tool_result`` (resetting | ||
| tool_call_id/tool_call_name, closing any open text message) so MCP results | ||
| behave consistently with standard tool results. | ||
| """ | ||
| events: list[BaseEvent] = [] | ||
|
|
||
| if not content.call_id: | ||
| logger.warning("MCP tool result content missing call_id, skipping") | ||
| return events | ||
|
|
||
| events.append(ToolCallEndEvent(tool_call_id=content.call_id)) | ||
| flow.tool_calls_ended.add(content.call_id) | ||
|
|
||
| raw_output = content.output if content.output is not None else "" | ||
| result_content = raw_output if isinstance(raw_output, str) else json.dumps(make_json_safe(raw_output)) | ||
| message_id = generate_event_id() | ||
| events.append( | ||
| ToolCallResultEvent( | ||
| message_id=message_id, | ||
| tool_call_id=content.call_id, | ||
| content=result_content, | ||
| role="tool", | ||
| ) | ||
| ) | ||
|
|
||
| flow.tool_results.append( | ||
| { | ||
| "id": message_id, | ||
| "role": "tool", | ||
| "toolCallId": content.call_id, | ||
| "content": result_content, | ||
| } | ||
| ) | ||
|
|
||
| # Mirror _emit_tool_result cleanup so MCP results behave consistently | ||
| flow.tool_call_id = None | ||
| flow.tool_call_name = None | ||
|
|
||
| if flow.message_id: | ||
| logger.debug("Closing text message for MCP tool result: message_id=%s", flow.message_id) | ||
| events.append(TextMessageEndEvent(message_id=flow.message_id)) | ||
| flow.message_id = None | ||
| flow.accumulated_text = "" | ||
|
|
||
| return events | ||
|
Comment on lines
+428
to
+462
|
||
|
|
||
|
|
||
| def _emit_text_reasoning(content: Content) -> list[BaseEvent]: | ||
| """Emit AG-UI reasoning events for text_reasoning content. | ||
|
|
||
| Uses the protocol-defined reasoning event types (``ReasoningStartEvent``, | ||
| ``ReasoningMessageStartEvent``, ``ReasoningMessageContentEvent``, | ||
| ``ReasoningMessageEndEvent``, ``ReasoningEndEvent``) so that AG-UI consumers | ||
| such as CopilotKit can render reasoning natively. | ||
|
|
||
| Only ``content.text`` is used for the visible reasoning message. If | ||
| ``content.protected_data`` is present it is forwarded as the | ||
| ``encrypted_value`` field on the ``ReasoningMessageEndEvent`` so that | ||
| consumers can persist it for state continuity without conflating it | ||
| with display text. | ||
| """ | ||
| text = content.text or "" | ||
| if not text and content.protected_data is None: | ||
| return [] | ||
|
|
||
| message_id = content.id or generate_event_id() | ||
|
|
||
| events: list[BaseEvent] = [ | ||
| ReasoningStartEvent(), | ||
| ReasoningMessageStartEvent(message_id=message_id, role="reasoning"), | ||
| ] | ||
|
|
||
| if text: | ||
| events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text)) | ||
|
|
||
| events.append(ReasoningMessageEndEvent(message_id=message_id)) | ||
| events.append(ReasoningEndEvent()) | ||
|
|
||
| return events | ||
|
|
||
|
|
||
| def _emit_content( | ||
| content: Any, | ||
| flow: FlowState, | ||
|
|
@@ -374,5 +515,11 @@ def _emit_content( | |
| return _emit_approval_request(content, flow, predictive_handler, require_confirmation) | ||
| if content_type == "usage": | ||
| return _emit_usage(content) | ||
| if content_type == "mcp_server_tool_call": | ||
| return _emit_mcp_tool_call(content, flow) | ||
| if content_type == "mcp_server_tool_result": | ||
| return _emit_mcp_tool_result(content, flow) | ||
| if content_type == "text_reasoning": | ||
| return _emit_text_reasoning(content) | ||
| logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type) | ||
| return [] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the event that call_id is missing, we simply return. Should we log anything?