You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I tried creating an executor passing a generic and then assigning ctx[T] but on runtime the validation fails since it the edges validation validates directly against the generic and not the value.
I created a workaround to have a factory + assign manually th param values, this seems to work with the validation:
defcreate_echo_executor(id: str, message_type: Type) ->Executor:
""" Factory that creates a concrete EchoExecutor whose handler has explicit type annotations matching *message_type*, so agent_framework can resolve input/output types at class-definition time. Usage: echo = create_echo_executor("start", PageContent) """class_EchoExecutor(Executor):
asyncdefecho(self, message, ctx):
ifnotisinstance(message, message_type):
raiseTypeError(
f"Expected message of type {message_type.__name__}, "f"got {type(message).__name__}"
)
awaitctx.send_message(message)
awaitctx.yield_output(message)
returnmessage# Set concrete type annotations BEFORE applying @handler_EchoExecutor.echo.__annotations__["message"] =message_type_EchoExecutor.echo.__annotations__["ctx"] =WorkflowContext[message_type]
_EchoExecutor.echo.__annotations__["return"] =message_type_EchoExecutor.echo=handler(_EchoExecutor.echo)
return_EchoExecutor(id=id)
So, it seems there are 2 issues:
Allow sending workflow input payload to multiple executors -> Ideally the EchoExecutor should be created like: myCustomEcho = EchoExecutor(id, MyCustomType) or similar.
Allow generic executor types to be piped properly without the __annotations__ workaround
Code Sample
fromtypingimportTypefromagent_frameworkimport (
Executor,
WorkflowContext,
handler,
)
defcreate_echo_executor(id: str, message_type: Type) ->Executor:
""" Factory that creates a concrete EchoExecutor whose handler has explicit type annotations matching *message_type*, so agent_framework can resolve input/output types at class-definition time. Usage: echo = create_echo_executor("start", PageContent) """class_EchoExecutor(Executor):
asyncdefecho(self, message, ctx):
ifnotisinstance(message, message_type):
raiseTypeError(
f"Expected message of type {message_type.__name__}, "f"got {type(message).__name__}"
)
awaitctx.send_message(message)
awaitctx.yield_output(message)
returnmessage# Set concrete type annotations BEFORE applying @handler_EchoExecutor.echo.__annotations__["message"] =message_type_EchoExecutor.echo.__annotations__["ctx"] =WorkflowContext[message_type]
_EchoExecutor.echo.__annotations__["return"] =message_type_EchoExecutor.echo=handler(_EchoExecutor.echo)
return_EchoExecutor(id=id)
Description
I'm trying to build a workflow that looks like this
graph LR Input([Input Payload]) -->|TypeX| ExecutorA -->|TypeA|ExecutorB -->|TypeB|ExecutorC Input -->|TypeX| ExecutorCSince the input payload for a workflow cannot be send to multiple executors, created a start step to do so that works as an echo:
graph LR Input([Input Payload]) -->|TypeX| StartExecutor-->|TypeX| ExecutorA -->|TypeA|ExecutorB -->|TypeB|ExecutorC StartExecutor-->|TypeX| ExecutorCI tried creating an executor passing a generic and then assigning
ctx[T]but on runtime the validation fails since it the edges validation validates directly against the generic and not the value.I created a workaround to have a factory + assign manually th param values, this seems to work with the validation:
So, it seems there are 2 issues:
myCustomEcho = EchoExecutor(id, MyCustomType)or similar.__annotations__workaroundCode Sample
Error Messages / Stack Traces
Package Versions
agent-framework-azure-ai==1.0.0rc3
Python Version
No response
Additional Context
No response