Skip to content
Merged
112 changes: 111 additions & 1 deletion src/a2a/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,62 @@ def validate(
and returns a boolean.
error_message: An optional custom error message for the `UnsupportedOperationError`.
If None, the string representation of the expression will be used.

Examples:
Demonstrating with an async method:
>>> import asyncio
>>> from a2a.utils.errors import ServerError
>>>
>>> class MyAgent:
... def __init__(self, streaming_enabled: bool):
... self.streaming_enabled = streaming_enabled
...
... @validate(
... lambda self: self.streaming_enabled,
... 'Streaming is not enabled for this agent',
... )
... async def stream_response(self, message: str):
... return f'Streaming: {message}'
>>>
>>> async def run_async_test():
... # Successful call
... agent_ok = MyAgent(streaming_enabled=True)
... result = await agent_ok.stream_response('hello')
... print(result)
...
... # Call that fails validation
... agent_fail = MyAgent(streaming_enabled=False)
... try:
... await agent_fail.stream_response('world')
... except ServerError as e:
... print(e.error.message)
>>>
>>> asyncio.run(run_async_test())
Streaming: hello
Streaming is not enabled for this agent

Demonstrating with a sync method:
>>> class SecureAgent:
... def __init__(self):
... self.auth_enabled = False
...
... @validate(
... lambda self: self.auth_enabled,
... 'Authentication must be enabled for this operation',
... )
... def secure_operation(self, data: str):
... return f'Processing secure data: {data}'
>>>
>>> # Error case example
>>> agent = SecureAgent()
>>> try:
... agent.secure_operation('secret')
... except ServerError as e:
... print(e.error.message)
Authentication must be enabled for this operation

Note:
This decorator works with both sync and async methods automatically.
"""

def decorator(function: Callable) -> Callable:
Expand Down Expand Up @@ -174,7 +230,7 @@ def sync_wrapper(self: Any, *args, **kwargs) -> Any:
def validate_async_generator(
expression: Callable[[Any], bool], error_message: str | None = None
):
"""Decorator that validates if a given expression evaluates to True.
"""Decorator that validates if a given expression evaluates to True for async generators.

Typically used on class methods to check capabilities or configuration
before executing the method's logic. If the expression is False,
Expand All @@ -185,6 +241,60 @@ def validate_async_generator(
and returns a boolean.
error_message: An optional custom error message for the `UnsupportedOperationError`.
If None, the string representation of the expression will be used.

Examples:
Streaming capability validation with success case:
>>> import asyncio
>>> from a2a.utils.errors import ServerError
>>>
>>> class StreamingAgent:
... def __init__(self, streaming_enabled: bool):
... self.streaming_enabled = streaming_enabled
...
... @validate_async_generator(
... lambda self: self.streaming_enabled,
... 'Streaming is not supported by this agent',
... )
... async def stream_messages(self, count: int):
... for i in range(count):
... yield f'Message {i}'
>>>
>>> async def run_streaming_test():
... # Successful streaming
... agent = StreamingAgent(streaming_enabled=True)
... async for msg in agent.stream_messages(2):
... print(msg)
>>>
>>> asyncio.run(run_streaming_test())
Message 0
Message 1

Error case - validation fails:
>>> class FeatureAgent:
... def __init__(self):
... self.features = {'real_time': False}
...
... @validate_async_generator(
... lambda self: self.features.get('real_time', False),
... 'Real-time feature must be enabled to stream updates',
... )
... async def real_time_updates(self):
... yield 'This should not be yielded'
>>>
>>> async def run_error_test():
... agent = FeatureAgent()
... try:
... async for _ in agent.real_time_updates():
... pass
... except ServerError as e:
... print(e.error.message)
>>>
>>> asyncio.run(run_error_test())
Real-time feature must be enabled to stream updates

Note:
This decorator is specifically for async generator methods (async def with yield).
The validation happens before the generator starts yielding values.
"""

def decorator(function):
Expand Down
Loading