diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index af0959a2..96c1646a 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -139,6 +139,62 @@ def validate( and returns a boolean. error_message: An optional custom error message for the `UnsupportedOperationError`. If None, the string representation of the expression will be used. + + Examples: + Demonstrating with an async method: + >>> import asyncio + >>> from a2a.utils.errors import ServerError + >>> + >>> class MyAgent: + ... def __init__(self, streaming_enabled: bool): + ... self.streaming_enabled = streaming_enabled + ... + ... @validate( + ... lambda self: self.streaming_enabled, + ... 'Streaming is not enabled for this agent', + ... ) + ... async def stream_response(self, message: str): + ... return f'Streaming: {message}' + >>> + >>> async def run_async_test(): + ... # Successful call + ... agent_ok = MyAgent(streaming_enabled=True) + ... result = await agent_ok.stream_response('hello') + ... print(result) + ... + ... # Call that fails validation + ... agent_fail = MyAgent(streaming_enabled=False) + ... try: + ... await agent_fail.stream_response('world') + ... except ServerError as e: + ... print(e.error.message) + >>> + >>> asyncio.run(run_async_test()) + Streaming: hello + Streaming is not enabled for this agent + + Demonstrating with a sync method: + >>> class SecureAgent: + ... def __init__(self): + ... self.auth_enabled = False + ... + ... @validate( + ... lambda self: self.auth_enabled, + ... 'Authentication must be enabled for this operation', + ... ) + ... def secure_operation(self, data: str): + ... return f'Processing secure data: {data}' + >>> + >>> # Error case example + >>> agent = SecureAgent() + >>> try: + ... agent.secure_operation('secret') + ... except ServerError as e: + ... print(e.error.message) + Authentication must be enabled for this operation + + Note: + This decorator works with both sync and async methods automatically. """ def decorator(function: Callable) -> Callable: @@ -174,7 +230,7 @@ def sync_wrapper(self: Any, *args, **kwargs) -> Any: def validate_async_generator( expression: Callable[[Any], bool], error_message: str | None = None ): - """Decorator that validates if a given expression evaluates to True. + """Decorator that validates if a given expression evaluates to True for async generators. Typically used on class methods to check capabilities or configuration before executing the method's logic. If the expression is False, @@ -185,6 +241,60 @@ def validate_async_generator( and returns a boolean. error_message: An optional custom error message for the `UnsupportedOperationError`. If None, the string representation of the expression will be used. + + Examples: + Streaming capability validation with success case: + >>> import asyncio + >>> from a2a.utils.errors import ServerError + >>> + >>> class StreamingAgent: + ... def __init__(self, streaming_enabled: bool): + ... self.streaming_enabled = streaming_enabled + ... + ... @validate_async_generator( + ... lambda self: self.streaming_enabled, + ... 'Streaming is not supported by this agent', + ... ) + ... async def stream_messages(self, count: int): + ... for i in range(count): + ... yield f'Message {i}' + >>> + >>> async def run_streaming_test(): + ... # Successful streaming + ... agent = StreamingAgent(streaming_enabled=True) + ... async for msg in agent.stream_messages(2): + ... print(msg) + >>> + >>> asyncio.run(run_streaming_test()) + Message 0 + Message 1 + + Error case - validation fails: + >>> class FeatureAgent: + ... def __init__(self): + ... self.features = {'real_time': False} + ... + ... @validate_async_generator( + ... lambda self: self.features.get('real_time', False), + ... 'Real-time feature must be enabled to stream updates', + ... ) + ... async def real_time_updates(self): + ... yield 'This should not be yielded' + >>> + >>> async def run_error_test(): + ... agent = FeatureAgent() + ... try: + ... async for _ in agent.real_time_updates(): + ... pass + ... except ServerError as e: + ... print(e.error.message) + >>> + >>> asyncio.run(run_error_test()) + Real-time feature must be enabled to stream updates + + Note: + This decorator is specifically for async generator methods (async def with yield). + The validation happens before the generator starts yielding values. """ def decorator(function):