-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Python: feat: Add Agent Framework to A2A bridge support #2403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
moonbox3
merged 53 commits into
microsoft:main
from
Shubham-Kumar-2000:agent_framework_to_a2a
Apr 24, 2026
Merged
Changes from all commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
298a443
feat: Add Agent Framework to A2A bridge support
Shubham-Kumar-2000 828b733
fix: Update references from agent_thread_storage to _agent_thread_sto…
Shubham-Kumar-2000 255ee71
Refactor A2A agent framework and improve code structure
Shubham-Kumar-2000 1e600dc
fix: Lint fix new line added
Shubham-Kumar-2000 c7a8b80
test: Add unit tests for AgentThreadStorage and InMemoryAgentThreadSt…
Shubham-Kumar-2000 db5bdf3
refactor: Update type hints to use new syntax for Union and List
Shubham-Kumar-2000 a3a5c48
fix: Validate RequestContext for context_id and message before execution
Shubham-Kumar-2000 4c43248
Refactor tests and remove A2aExecutionContext references
Shubham-Kumar-2000 cc2df5c
Refactor A2AExecutor tests and remove event adapter
Shubham-Kumar-2000 c732a05
refactor: Remove AgentThreadStorage and InMemoryAgentThreadStorage cl…
Shubham-Kumar-2000 64c1cfb
feat: A2AExecutor to have its own override able save and get threads …
Shubham-Kumar-2000 1f8a002
fix: linter bugs
Shubham-Kumar-2000 293f25d
Merge branch 'main' of https://github.com/Shubham-Kumar-2000/agent-fr…
Shubham-Kumar-2000 e3bb889
removed unnecessary changes form core package
Shubham-Kumar-2000 960bb22
new line added
Shubham-Kumar-2000 ed01679
Merge branch 'main' of https://github.com/microsoft/agent-framework i…
Shubham-Kumar-2000 f8ee82b
Refactor A2AExecutor tests and update imports
Shubham-Kumar-2000 f14e7c4
Update A2A documentation: enhance usage examples for A2AAgent and A2A…
Shubham-Kumar-2000 9d84791
Updated uv lock
Shubham-Kumar-2000 6e61d13
Fix metadata assertion in TestA2AExecutorHandleEvents and reorder loa…
Shubham-Kumar-2000 14b17d8
Update agent card configuration: add default input and output modes, …
Shubham-Kumar-2000 ae34dcf
Fix assertion for metadata in TestA2AExecutorHandleEvents
Shubham-Kumar-2000 4674494
Fix formatting issues in TestA2AExecutorExecute and TestA2AExecutorIn…
Shubham-Kumar-2000 7022f7f
Enhance A2AExecutor documentation with examples and clarify agent exe…
Shubham-Kumar-2000 1c7a196
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 25c5528
Merge branch 'main' of https://github.com/microsoft/agent-framework i…
Shubham-Kumar-2000 556f1d3
Merge branch 'agent_framework_to_a2a' of https://github.com/Shubham-K…
Shubham-Kumar-2000 8c9feba
Merge branch 'main' of https://github.com/microsoft/agent-framework i…
Shubham-Kumar-2000 568fbd5
Revert uv lock to main
Shubham-Kumar-2000 c2f09c3
Refactor A2AExecutor: Improve formatting and streamline constructor p…
Shubham-Kumar-2000 46c6dcf
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 9791458
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 4d3cf2b
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 50f8e83
Apply suggestions from code review
Shubham-Kumar-2000 8e31511
Refactor A2AExecutor to use SupportsAgentRun and enhance logging; upd…
Shubham-Kumar-2000 0297c6f
Merge branch 'main' into agent_framework_to_a2a
eavanvalkenburg 327352f
Enhance A2AExecutor with streaming support and custom run arguments; …
Shubham-Kumar-2000 e5b42a9
Merge branch 'agent_framework_to_a2a' of https://github.com/Shubham-K…
Shubham-Kumar-2000 e857572
Enhance A2AExecutor event handling with streamed artifact tracking; u…
Shubham-Kumar-2000 81032f8
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 59dea33
Refactor A2AExecutor to enforce type hints for stream and run_kwargs …
Shubham-Kumar-2000 c1340d3
Merge branch 'agent_framework_to_a2a' of https://github.com/Shubham-K…
Shubham-Kumar-2000 fad2e82
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 dae1c19
Refactor A2AExecutor and tests: replace AsyncMock with MagicMock for …
Shubham-Kumar-2000 877cefd
Merge branch 'agent_framework_to_a2a' of https://github.com/Shubham-K…
Shubham-Kumar-2000 2ce08eb
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 39b1491
refactor: streamline imports and improve code readability across mult…
Shubham-Kumar-2000 6a5f29d
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 9bb1fae
feat: enhance A2AExecutor cancel method with context validation and f…
Shubham-Kumar-2000 0257d01
feat: implement get_uri_data utility function for extracting base64 d…
Shubham-Kumar-2000 1e5766c
fix: update import path for get_uri_data utility function in A2AExecu…
Shubham-Kumar-2000 9889e68
fix: correct error message handling in A2AExecutor and update test as…
Shubham-Kumar-2000 0f46d52
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
275 changes: 275 additions & 0 deletions
275
python/packages/a2a/agent_framework_a2a/_a2a_executor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,275 @@ | ||
| # Copyright (c) Microsoft. All rights reserved. | ||
|
|
||
| import logging | ||
| from asyncio import CancelledError | ||
| from collections.abc import Mapping | ||
| from functools import partial | ||
| from typing import Any | ||
|
|
||
| from a2a.server.agent_execution import AgentExecutor, RequestContext | ||
| from a2a.server.events import EventQueue | ||
| from a2a.server.tasks import TaskUpdater | ||
| from a2a.types import FilePart, FileWithBytes, FileWithUri, Part, TaskState, TextPart | ||
| from a2a.utils import new_task | ||
| from agent_framework import ( | ||
| AgentResponseUpdate, | ||
| AgentSession, | ||
| Message, | ||
| SupportsAgentRun, | ||
| ) | ||
| from typing_extensions import override | ||
|
|
||
| from agent_framework_a2a._utils import get_uri_data | ||
|
|
||
| logger = logging.getLogger("agent_framework.a2a") | ||
|
|
||
|
|
||
| class A2AExecutor(AgentExecutor): | ||
| """Execute AI agents using the A2A (Agent-to-Agent) protocol. | ||
|
|
||
| The A2AExecutor bridges AI agents built with the agent_framework library and the A2A protocol, | ||
| enabling structured agent execution with event-driven communication. It handles execution | ||
| contexts, delegates history management to the agent's session, and converts agent | ||
| responses into A2A protocol events. | ||
|
|
||
| The executor supports executing an Agent or WorkflowAgent. It provides comprehensive | ||
| error handling with task status updates and supports various content types including text, | ||
| binary data, and URI-based content. | ||
|
|
||
| Example: | ||
| .. code-block:: python | ||
|
|
||
| from a2a.server.apps import A2AStarletteApplication | ||
| from a2a.server.request_handlers import DefaultRequestHandler | ||
| from a2a.server.tasks import InMemoryTaskStore | ||
| from a2a.types import AgentCapabilities, AgentCard | ||
| from agent_framework.a2a import A2AExecutor | ||
| from agent_framework.openai import OpenAIResponsesClient | ||
|
|
||
| public_agent_card = AgentCard( | ||
| name="Food Agent", | ||
| description="A simple agent that provides food-related information.", | ||
| url="http://localhost:9999/", | ||
| version="1.0.0", | ||
| defaultInputModes=["text"], | ||
| defaultOutputModes=["text"], | ||
| capabilities=AgentCapabilities(streaming=True), | ||
| skills=[], | ||
| ) | ||
|
|
||
| # Create an agent | ||
| agent = OpenAIResponsesClient().as_agent( | ||
| name="Food Agent", | ||
| instructions="A simple agent that provides food-related information.", | ||
| ) | ||
|
|
||
| # Set up the A2A server with the A2AExecutor enabled for streaming | ||
| # and passing custom keyword arguments to the agent's run method. | ||
| request_handler = DefaultRequestHandler( | ||
| agent_executor=A2AExecutor(agent, stream=True, run_kwargs={"client_kwargs": {"max_tokens": 500}}), | ||
| task_store=InMemoryTaskStore(), | ||
| ) | ||
|
|
||
| server = A2AStarletteApplication( | ||
| agent_card=public_agent_card, | ||
| http_handler=request_handler, | ||
| ).build() | ||
|
|
||
| Args: | ||
| agent: The AI agent to execute. | ||
| stream: Whether to stream the agent response. Defaults to False. | ||
| run_kwargs: Additional keyword arguments to pass to the agent's run method. | ||
| """ | ||
|
|
||
| def __init__(self, agent: SupportsAgentRun, stream: bool = False, run_kwargs: Mapping[str, Any] | None = None): | ||
| """Initialize the A2AExecutor with the specified agent. | ||
|
|
||
| Args: | ||
| agent: The AI agent or workflow to execute. | ||
| stream: Whether to stream the agent response. Defaults to False. | ||
| run_kwargs: Additional keyword arguments to pass to the agent's run method. | ||
| Cannot contain 'session' or 'stream' as these are managed by the executor. | ||
|
|
||
| Raises: | ||
| ValueError: If run_kwargs contains 'session' or 'stream'. | ||
| """ | ||
| super().__init__() | ||
| self._agent: SupportsAgentRun = agent | ||
| self._stream: bool = stream | ||
| if run_kwargs: | ||
| if "session" in run_kwargs: | ||
| raise ValueError("run_kwargs cannot contain 'session' as it is managed by the executor.") | ||
| if "stream" in run_kwargs: | ||
| raise ValueError("run_kwargs cannot contain 'stream' as it is managed by the executor.") | ||
| self._run_kwargs: Mapping[str, Any] = run_kwargs or {} | ||
|
|
||
| @override | ||
| async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: | ||
| """Cancel agent execution for the given request context. | ||
|
|
||
| Uses a TaskUpdater to send a cancellation event through the provided event queue. | ||
|
|
||
| Args: | ||
| context: The request context identifying the task to cancel. | ||
| event_queue: The event queue to publish the cancellation event to. | ||
|
|
||
| Raises: | ||
| ValueError: If context_id is not provided in the RequestContext. | ||
| """ | ||
| if context.context_id is None: | ||
| raise ValueError("Context ID must be provided in the RequestContext") | ||
|
|
||
| updater = TaskUpdater( | ||
| event_queue=event_queue, | ||
| task_id=context.task_id or "", | ||
| context_id=context.context_id, | ||
| ) | ||
|
|
||
| await updater.cancel() | ||
|
|
||
| @override | ||
| async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: | ||
| """Execute the agent with the given context and event queue. | ||
|
|
||
| Orchestrates the agent execution process: sets up the agent session, | ||
| executes the agent, processes response messages, and handles errors with appropriate task status updates. | ||
| """ | ||
| if context.context_id is None: | ||
| raise ValueError("Context ID must be provided in the RequestContext") | ||
| if context.message is None: | ||
| raise ValueError("Message must be provided in the RequestContext") | ||
|
|
||
| query = context.get_user_input() | ||
| task = context.current_task | ||
|
|
||
| if not task: | ||
| task = new_task(context.message) | ||
| await event_queue.enqueue_event(task) | ||
|
|
||
| updater = TaskUpdater(event_queue, task.id, context.context_id) | ||
| await updater.submit() | ||
|
|
||
| try: | ||
| await updater.start_work() | ||
|
|
||
| session = self._agent.create_session(session_id=task.context_id) | ||
|
|
||
| if self._stream: | ||
| await self._run_stream(query, session, updater) | ||
| else: | ||
| await self._run(query, session, updater) | ||
|
|
||
| # Mark as complete | ||
| await updater.complete() | ||
| except CancelledError: | ||
| await updater.update_status(state=TaskState.canceled, final=True) | ||
| except Exception as e: | ||
| logger.exception("A2AExecutor encountered an error during execution.", exc_info=e) | ||
| await updater.update_status( | ||
| state=TaskState.failed, | ||
| final=True, | ||
| message=updater.new_agent_message([Part(root=TextPart(text=str(e)))]), | ||
| ) | ||
|
|
||
| async def _run_stream(self, query: Any, session: AgentSession, updater: TaskUpdater) -> None: | ||
| """Run the agent in streaming mode and publish updates to the task updater.""" | ||
| response_stream = self._agent.run(query, session=session, stream=True, **self._run_kwargs) | ||
| streamed_artifact_ids: set[str] = set() | ||
| await ( | ||
| response_stream.with_transform_hook( | ||
| partial(self.handle_events, updater=updater, streamed_artifact_ids=streamed_artifact_ids) | ||
| ) | ||
| ).get_final_response() | ||
|
|
||
| async def _run(self, query: Any, session: AgentSession, updater: TaskUpdater) -> None: | ||
| """Run the agent in non-streaming mode and publish messages to the task updater.""" | ||
| response = await self._agent.run(query, session=session, stream=False, **self._run_kwargs) | ||
| response_messages = response.messages | ||
|
|
||
| if not isinstance(response_messages, list): | ||
| response_messages = [response_messages] | ||
|
|
||
| for message in response_messages: | ||
| await self.handle_events(message, updater) | ||
|
|
||
| async def handle_events( | ||
| self, item: Message | AgentResponseUpdate, updater: TaskUpdater, streamed_artifact_ids: set[str] | None = None | ||
| ) -> None: | ||
| """Convert agent response items (Messages or Updates) to A2A protocol events. | ||
|
|
||
| Processes Message or AgentResponseUpdate objects and converts them into A2A protocol format. | ||
| Handles text, data, and URI content. USER role messages are skipped. | ||
|
|
||
| Users can override this method in a subclass to implement custom transformations | ||
| from their agent's output format to A2A protocol events. | ||
|
|
||
| Args: | ||
| item: The agent response item (Message or AgentResponseUpdate) to process. | ||
| updater: The task updater to publish events to. | ||
| streamed_artifact_ids: A set of artifact IDs that have already been streamed. | ||
| Used to prevent duplicate updates for the same artifact. | ||
|
|
||
| Example: | ||
| .. code-block:: python | ||
|
|
||
| class CustomA2AExecutor(A2AExecutor): | ||
| async def handle_events( | ||
| self, | ||
| item: Message | AgentResponseUpdate, | ||
| updater: TaskUpdater, | ||
| streamed_artifact_ids: set[str] | None = None, | ||
| ) -> None: | ||
| # Custom logic to transform item contents | ||
| if item.role == "assistant" and item.contents: | ||
| parts = [Part(root=TextPart(text=f"Custom: {item.contents[0].text}"))] | ||
| await updater.update_status( | ||
| state=TaskState.working, | ||
| message=updater.new_agent_message(parts=parts), | ||
| ) | ||
| else: | ||
| await super().handle_events(item, updater) | ||
| """ | ||
| role = getattr(item, "role", None) | ||
| if role == "user": | ||
| # This is a user message, we can ignore it in the context of task updates | ||
| return | ||
|
|
||
| parts: list[Part] = [] | ||
| metadata = getattr(item, "additional_properties", None) | ||
|
moonbox3 marked this conversation as resolved.
|
||
|
|
||
| # AgentResponseUpdate uses 'contents', Message uses 'contents' | ||
| contents = getattr(item, "contents", []) | ||
|
|
||
| for content in contents: | ||
| if content.type == "text" and content.text: | ||
| parts.append(Part(root=TextPart(text=content.text))) | ||
| elif content.type == "data" and content.uri: | ||
| base64_str = get_uri_data(content.uri) | ||
| parts.append(Part(root=FilePart(file=FileWithBytes(bytes=base64_str, mime_type=content.media_type)))) | ||
| elif content.type == "uri" and content.uri: | ||
|
moonbox3 marked this conversation as resolved.
|
||
| parts.append(Part(root=FilePart(file=FileWithUri(uri=content.uri, mime_type=content.media_type)))) | ||
| else: | ||
| # Silently skip unsupported content types | ||
| logger.warning("A2AExecutor does not yet support content type: %s. Omitted.", content.type) | ||
|
|
||
| if parts: | ||
| if isinstance(item, AgentResponseUpdate): | ||
| # For streaming updates, we send TaskArtifactUpdateEvent via add_artifact | ||
| await updater.add_artifact( | ||
| parts=parts, | ||
| artifact_id=item.message_id, | ||
| metadata=metadata, | ||
| append=( | ||
| True | ||
| if streamed_artifact_ids is not None and item.message_id in (streamed_artifact_ids or set()) | ||
| else None | ||
| ), | ||
| ) | ||
| if item.message_id and streamed_artifact_ids is not None: | ||
| streamed_artifact_ids.add(item.message_id) | ||
| else: | ||
| # For final messages, we send TaskStatusUpdateEvent with 'working' state | ||
| await updater.update_status( | ||
| state=TaskState.working, | ||
| message=updater.new_agent_message(parts=parts, metadata=metadata), | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.