From 2a9c50998cedb90baa7cf5bf65380a6a371f6397 Mon Sep 17 00:00:00 2001 From: Andrew Hoblitzell Date: Fri, 22 Aug 2025 03:54:57 +0400 Subject: [PATCH 1/3] docs: add docs to util helper --- src/a2a/utils/helpers.py | 72 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index af0959a2..e81beaa0 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -139,6 +139,38 @@ def validate( and returns a boolean. error_message: An optional custom error message for the `UnsupportedOperationError`. If None, the string representation of the expression will be used. + + Examples: + Basic usage with capability check: + >>> class MyAgent: + ... def __init__(self): + ... self.agent_card = AgentCard( + ... capabilities=Capabilities(streaming=True) + ... ) + ... + ... @validate(lambda self: self.agent_card.capabilities.streaming) + ... async def stream_response(self, message: str): + ... return f'Streaming: {message}' + + With custom error message: + >>> class SecureAgent: + ... def __init__(self): + ... self.auth_enabled = False + ... + ... @validate( + ... lambda self: self.auth_enabled, + ... 'Authentication must be enabled for this operation', + ... ) + ... def secure_operation(self, data: str): + ... return f'Processing secure data: {data}' + + Error case example: + >>> agent = SecureAgent() + >>> agent.secure_operation('secret') # Raises ServerError + ServerError: UnsupportedOperationError(message="Authentication must be enabled for this operation") + + Note: + This decorator works with both sync and async methods automatically. """ def decorator(function: Callable) -> Callable: @@ -174,7 +206,7 @@ def sync_wrapper(self: Any, *args, **kwargs) -> Any: def validate_async_generator( expression: Callable[[Any], bool], error_message: str | None = None ): - """Decorator that validates if a given expression evaluates to True. + """Decorator that validates if a given expression evaluates to True for async generators. Typically used on class methods to check capabilities or configuration before executing the method's logic. If the expression is False, @@ -185,6 +217,44 @@ def validate_async_generator( and returns a boolean. error_message: An optional custom error message for the `UnsupportedOperationError`. If None, the string representation of the expression will be used. + + Examples: + Streaming capability validation: + >>> class StreamingAgent: + ... def __init__(self): + ... self.agent_card = AgentCard( + ... capabilities=Capabilities(streaming=True) + ... ) + ... + ... @validate_async_generator( + ... lambda self: self.agent_card.capabilities.streaming, + ... 'Streaming is not supported by this agent', + ... ) + ... async def stream_messages(self, count: int): + ... for i in range(count): + ... yield f'Message {i}' + + Feature flag validation: + >>> class FeatureAgent: + ... def __init__(self): + ... self.features = {'real_time': False} + ... + ... @validate_async_generator( + ... lambda self: self.features.get('real_time', False) + ... ) + ... async def real_time_updates(self): + ... async for update in self._get_updates(): + ... yield update + + Error case - validation fails: + >>> agent = FeatureAgent() + >>> async for msg in agent.real_time_updates(): + ... print(msg) # Raises ServerError before iteration starts + ServerError: UnsupportedOperationError(message="") + + Note: + This decorator is specifically for async generator methods (async def with yield). + The validation happens before the generator starts yielding values. """ def decorator(function): From 51b8d299ab305e239594c801e44c8b2bd7427762 Mon Sep 17 00:00:00 2001 From: Andrew Hoblitzell Date: Sat, 23 Aug 2025 12:19:50 -0400 Subject: [PATCH 2/3] update docs --- src/a2a/utils/helpers.py | 51 +++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index e81beaa0..cb7242c0 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -143,12 +143,10 @@ def validate( Examples: Basic usage with capability check: >>> class MyAgent: - ... def __init__(self): - ... self.agent_card = AgentCard( - ... capabilities=Capabilities(streaming=True) - ... ) + ... def __init__(self, streaming_enabled: bool): + ... self.streaming_enabled = streaming_enabled ... - ... @validate(lambda self: self.agent_card.capabilities.streaming) + ... @validate(lambda self: self.streaming_enabled) ... async def stream_response(self, message: str): ... return f'Streaming: {message}' @@ -165,9 +163,13 @@ def validate( ... return f'Processing secure data: {data}' Error case example: + >>> from a2a.utils.errors import ServerError >>> agent = SecureAgent() - >>> agent.secure_operation('secret') # Raises ServerError - ServerError: UnsupportedOperationError(message="Authentication must be enabled for this operation") + >>> try: + ... agent.secure_operation('secret') + ... except ServerError as e: + ... print(e.error.message) + Authentication must be enabled for this operation Note: This decorator works with both sync and async methods automatically. @@ -221,36 +223,41 @@ def validate_async_generator( Examples: Streaming capability validation: >>> class StreamingAgent: - ... def __init__(self): - ... self.agent_card = AgentCard( - ... capabilities=Capabilities(streaming=True) - ... ) + ... def __init__(self, streaming_enabled: bool): + ... self.streaming_enabled = streaming_enabled ... ... @validate_async_generator( - ... lambda self: self.agent_card.capabilities.streaming, + ... lambda self: self.streaming_enabled, ... 'Streaming is not supported by this agent', ... ) ... async def stream_messages(self, count: int): ... for i in range(count): ... yield f'Message {i}' - Feature flag validation: + Error case - validation fails: + >>> from a2a.utils.errors import ServerError + >>> import asyncio + >>> >>> class FeatureAgent: ... def __init__(self): ... self.features = {'real_time': False} ... ... @validate_async_generator( - ... lambda self: self.features.get('real_time', False) + ... lambda self: self.features.get('real_time', False), + ... 'Real-time feature must be enabled to stream updates', ... ) ... async def real_time_updates(self): - ... async for update in self._get_updates(): - ... yield update - - Error case - validation fails: - >>> agent = FeatureAgent() - >>> async for msg in agent.real_time_updates(): - ... print(msg) # Raises ServerError before iteration starts - ServerError: UnsupportedOperationError(message="") + ... yield 'This should not be yielded' + >>> async def run_test(): + ... agent = FeatureAgent() + ... try: + ... async for _ in agent.real_time_updates(): + ... pass + ... except ServerError as e: + ... print(e.error.message) + >>> + >>> asyncio.run(run_test()) + Real-time feature must be enabled to stream updates Note: This decorator is specifically for async generator methods (async def with yield). From ae2083b518682f03de2f757ef7d4dcb4a22c47ec Mon Sep 17 00:00:00 2001 From: Andrew Hoblitzell Date: Thu, 4 Sep 2025 15:06:17 -0400 Subject: [PATCH 3/3] gemini comments --- src/a2a/utils/helpers.py | 57 +++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index cb7242c0..96c1646a 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -141,16 +141,39 @@ def validate( If None, the string representation of the expression will be used. Examples: - Basic usage with capability check: + Demonstrating with an async method: + >>> import asyncio + >>> from a2a.utils.errors import ServerError + >>> >>> class MyAgent: ... def __init__(self, streaming_enabled: bool): ... self.streaming_enabled = streaming_enabled ... - ... @validate(lambda self: self.streaming_enabled) + ... @validate( + ... lambda self: self.streaming_enabled, + ... 'Streaming is not enabled for this agent', + ... ) ... async def stream_response(self, message: str): ... return f'Streaming: {message}' + >>> + >>> async def run_async_test(): + ... # Successful call + ... agent_ok = MyAgent(streaming_enabled=True) + ... result = await agent_ok.stream_response('hello') + ... print(result) + ... + ... # Call that fails validation + ... agent_fail = MyAgent(streaming_enabled=False) + ... try: + ... await agent_fail.stream_response('world') + ... except ServerError as e: + ... print(e.error.message) + >>> + >>> asyncio.run(run_async_test()) + Streaming: hello + Streaming is not enabled for this agent - With custom error message: + Demonstrating with a sync method: >>> class SecureAgent: ... def __init__(self): ... self.auth_enabled = False @@ -161,9 +184,8 @@ def validate( ... ) ... def secure_operation(self, data: str): ... return f'Processing secure data: {data}' - - Error case example: - >>> from a2a.utils.errors import ServerError + >>> + >>> # Error case example >>> agent = SecureAgent() >>> try: ... agent.secure_operation('secret') @@ -221,7 +243,10 @@ def validate_async_generator( If None, the string representation of the expression will be used. Examples: - Streaming capability validation: + Streaming capability validation with success case: + >>> import asyncio + >>> from a2a.utils.errors import ServerError + >>> >>> class StreamingAgent: ... def __init__(self, streaming_enabled: bool): ... self.streaming_enabled = streaming_enabled @@ -233,11 +258,18 @@ def validate_async_generator( ... async def stream_messages(self, count: int): ... for i in range(count): ... yield f'Message {i}' + >>> + >>> async def run_streaming_test(): + ... # Successful streaming + ... agent = StreamingAgent(streaming_enabled=True) + ... async for msg in agent.stream_messages(2): + ... print(msg) + >>> + >>> asyncio.run(run_streaming_test()) + Message 0 + Message 1 Error case - validation fails: - >>> from a2a.utils.errors import ServerError - >>> import asyncio - >>> >>> class FeatureAgent: ... def __init__(self): ... self.features = {'real_time': False} @@ -248,7 +280,8 @@ def validate_async_generator( ... ) ... async def real_time_updates(self): ... yield 'This should not be yielded' - >>> async def run_test(): + >>> + >>> async def run_error_test(): ... agent = FeatureAgent() ... try: ... async for _ in agent.real_time_updates(): @@ -256,7 +289,7 @@ def validate_async_generator( ... except ServerError as e: ... print(e.error.message) >>> - >>> asyncio.run(run_test()) + >>> asyncio.run(run_error_test()) Real-time feature must be enabled to stream updates Note: