From 9ebec6043ccf642113574ab12021fa06d01d925a Mon Sep 17 00:00:00 2001 From: Pierre Date: Tue, 11 Feb 2025 17:56:13 -0600 Subject: [PATCH 1/5] add new workflow for agent delegation --- examples/workflows/README.md | 21 +++ examples/workflows/agent_delegation.py | 172 +++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 examples/workflows/agent_delegation.py diff --git a/examples/workflows/README.md b/examples/workflows/README.md index 478390b..a67d61e 100644 --- a/examples/workflows/README.md +++ b/examples/workflows/README.md @@ -58,6 +58,27 @@ The Chain of Agents pattern is designed for processing long documents or complex For an implementation example, see [chain_of_agents.py](chain_of_agents.py). +## 7. Agent Delegation +The Agent Delegation pattern enables dynamic and flexible workflows where one agent can invoke other agents through tools. This pattern is particularly useful when you want an agent to dynamically choose which specialized agents to use based on the task at hand, rather than having a fixed sequence or structure. + +**Key Features:** +- Dynamic model selection based on task requirements +- Flexible workflow that adapts based on initial responses +- Ability to track which agents were used and why +- Built-in confidence scoring for quality control + +**Example:** +- An orchestrator agent receives a complex task (e.g., system architecture design) +- It breaks down the task into smaller components +- For each component, it: + - Chooses the most appropriate model (e.g., GPT-4 for reasoning, Claude for analysis) + - Delegates the work through a tool + - Evaluates the response and confidence level + - Requests additional work if needed +- Finally, it synthesizes all responses into a coherent solution + +For an implementation example, see [agent_delegation.py](agent_delegation.py). + --- These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples) and research from organizations like [Google Research](https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/). diff --git a/examples/workflows/agent_delegation.py b/examples/workflows/agent_delegation.py new file mode 100644 index 0000000..3e0d180 --- /dev/null +++ b/examples/workflows/agent_delegation.py @@ -0,0 +1,172 @@ +""" +This example demonstrates agent delegation, where one agent (the orchestrator) can dynamically +invoke other agents through tools. This pattern is useful when you want to: + +1. Let an agent dynamically choose which specialized agents to use +2. Allow the orchestrator to adapt its strategy based on initial responses +3. Enable flexible workflows where the sequence of agent calls isn't fixed +4. Track which agents were used and why + +The example shows how to: +1. Set up a tool that allows one agent to call another +2. Structure input/output types for delegation +3. Configure the orchestrator agent with the delegation tool +4. Handle responses and track agent usage +""" + +import asyncio +from typing import List, Optional + +from pydantic import BaseModel, Field + +import workflowai +from workflowai import Model, Run + + +class DelegateInput(BaseModel): + """Input for delegating a task to a specialized agent.""" + task: str = Field(description="The task to delegate") + model: Model = Field(description="The model to use for this task") + context: Optional[str] = Field( + default=None, + description="Additional context that might help the agent", + ) + + +class DelegateOutput(BaseModel): + """Output from a delegated task.""" + response: str = Field(description="The agent's response to the task") + confidence: float = Field( + description="Confidence score between 0 and 1", + ge=0, + le=1, + ) + + +class WorkerInput(BaseModel): + """Input for the worker agent.""" + task: str = Field(description="The task to perform") + context: Optional[str] = Field( + default=None, + description="Additional context that might help with the task", + ) + + +class WorkerOutput(BaseModel): + """Output from the worker agent.""" + response: str = Field(description="The response to the task") + confidence: float = Field( + description="Confidence score between 0 and 1", + ge=0, + le=1, + ) + + +class OrchestratorInput(BaseModel): + """Input for the orchestrator agent.""" + objective: str = Field(description="The high-level objective to achieve") + requirements: List[str] = Field( + description="List of specific requirements or constraints", + min_items=1, + ) + + +class OrchestratorOutput(BaseModel): + """Final output from the orchestrator.""" + solution: str = Field(description="The final solution that meets the objective") + explanation: str = Field(description="Explanation of how the solution was derived") + agents_used: List[str] = Field( + description="List of agents/models used in the process", + default_factory=list, + ) + + +@workflowai.agent(id="worker") +async def worker_agent(input: WorkerInput, *, model: Model) -> Run[WorkerOutput]: + """ + A specialized worker agent that handles specific tasks. + + Make sure to: + 1. Focus on the specific task assigned + 2. Use the provided context if available + 3. Be clear about confidence level + 4. Explain any assumptions made + """ + ... + + +async def delegate_task(input: DelegateInput) -> DelegateOutput: + """Delegate a task to a worker agent with a specific model.""" + run = await worker_agent( + WorkerInput( + task=input.task, + context=input.context, + ), + model=input.model, + ) + return DelegateOutput( + response=run.output.response, + confidence=run.output.confidence, + ) + + +@workflowai.agent( + id="orchestrator", + model=Model.GPT_4O_LATEST, + tools=[delegate_task], +) +async def orchestrator_agent(input: OrchestratorInput) -> Run[OrchestratorOutput]: + """ + You are an expert orchestrator that breaks down complex objectives into smaller tasks + and delegates them to specialized agents. You can use the delegate_task tool to assign + work to other agents. + + Your responsibilities: + 1. Break down the objective into smaller, focused tasks + 2. Choose appropriate models for each task based on its nature: + - GPT-4O for complex reasoning or creative tasks + - Claude for analytical or structured tasks + - Gemini for technical or scientific tasks + 3. Use the delegate_task tool to assign work + 4. Evaluate responses and confidence levels + 5. Request additional work if needed + 6. Synthesize all responses into a cohesive solution + 7. Track which models were used and why + + Make sure the final solution: + - Meets all specified requirements + - Is well-reasoned and explained + - Acknowledges any limitations or uncertainties + - Lists all models/agents used in the process + """ + ... + + +async def main(): + # Example: Software architecture task + print("\nExample: Software Architecture Design") + print("-" * 50) + + result = await orchestrator_agent( + OrchestratorInput( + objective="Design a scalable microservices architecture for an e-commerce platform", + requirements=[ + "Must handle 10,000+ concurrent users", + "Include payment processing and inventory management", + "Ensure data consistency across services", + "Provide real-time order tracking", + ], + ), + ) + + print("\nSolution:") + print(result.output.solution) + print("\nExplanation:") + print(result.output.explanation) + print("\nAgents Used:") + for agent in result.output.agents_used: + print(f"- {agent}") + + +if __name__ == "__main__": + asyncio.run(main()) From 4794ea47052b1d2ea94b6242fc3937e2bbe01f30 Mon Sep 17 00:00:00 2001 From: Pierre Date: Tue, 11 Feb 2025 17:57:34 -0600 Subject: [PATCH 2/5] Update agent_delegation.py --- examples/workflows/agent_delegation.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/workflows/agent_delegation.py b/examples/workflows/agent_delegation.py index 3e0d180..4cd8979 100644 --- a/examples/workflows/agent_delegation.py +++ b/examples/workflows/agent_delegation.py @@ -82,27 +82,26 @@ class OrchestratorOutput(BaseModel): @workflowai.agent(id="worker") -async def worker_agent(input: WorkerInput, *, model: Model) -> Run[WorkerOutput]: +async def worker_agent(agent_input: WorkerInput, *, model: Model) -> Run[WorkerOutput]: """ A specialized worker agent that handles specific tasks. - + Make sure to: 1. Focus on the specific task assigned - 2. Use the provided context if available - 3. Be clear about confidence level - 4. Explain any assumptions made + 2. Provide detailed reasoning for your approach + 3. Include confidence level in your response """ ... -async def delegate_task(input: DelegateInput) -> DelegateOutput: +async def delegate_task(agent_input: DelegateInput) -> DelegateOutput: """Delegate a task to a worker agent with a specific model.""" run = await worker_agent( WorkerInput( - task=input.task, - context=input.context, + task=agent_input.task, + context=agent_input.context, ), - model=input.model, + model=agent_input.model, ) return DelegateOutput( response=run.output.response, @@ -115,7 +114,7 @@ async def delegate_task(input: DelegateInput) -> DelegateOutput: model=Model.GPT_4O_LATEST, tools=[delegate_task], ) -async def orchestrator_agent(input: OrchestratorInput) -> Run[OrchestratorOutput]: +async def orchestrator_agent(agent_input: OrchestratorInput) -> Run[OrchestratorOutput]: """ You are an expert orchestrator that breaks down complex objectives into smaller tasks and delegates them to specialized agents. You can use the delegate_task tool to assign From a8682adaee342361dc9d450afa43d67195dd890d Mon Sep 17 00:00:00 2001 From: Pierre Date: Tue, 11 Feb 2025 18:02:41 -0600 Subject: [PATCH 3/5] Update agent_delegation.py fix pyright issues --- examples/workflows/agent_delegation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/workflows/agent_delegation.py b/examples/workflows/agent_delegation.py index 4cd8979..9973ba0 100644 --- a/examples/workflows/agent_delegation.py +++ b/examples/workflows/agent_delegation.py @@ -67,7 +67,7 @@ class OrchestratorInput(BaseModel): objective: str = Field(description="The high-level objective to achieve") requirements: List[str] = Field( description="List of specific requirements or constraints", - min_items=1, + default_factory=list, ) @@ -81,8 +81,7 @@ class OrchestratorOutput(BaseModel): ) -@workflowai.agent(id="worker") -async def worker_agent(agent_input: WorkerInput, *, model: Model) -> Run[WorkerOutput]: +async def worker_agent(agent_input: WorkerInput) -> Run[WorkerOutput]: """ A specialized worker agent that handles specific tasks. @@ -96,12 +95,13 @@ async def worker_agent(agent_input: WorkerInput, *, model: Model) -> Run[WorkerO async def delegate_task(agent_input: DelegateInput) -> DelegateOutput: """Delegate a task to a worker agent with a specific model.""" - run = await worker_agent( + # Create a new worker agent with the specified model + worker = workflowai.agent(id="worker", model=agent_input.model)(worker_agent) + run = await worker( WorkerInput( task=agent_input.task, context=agent_input.context, ), - model=agent_input.model, ) return DelegateOutput( response=run.output.response, From b8c6954ce08b3ce879746cff351e62c825fca3ab Mon Sep 17 00:00:00 2001 From: Pierre Date: Thu, 13 Feb 2025 19:22:56 -0600 Subject: [PATCH 4/5] Update agent_delegation.py --- examples/workflows/agent_delegation.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/workflows/agent_delegation.py b/examples/workflows/agent_delegation.py index 9973ba0..072af04 100644 --- a/examples/workflows/agent_delegation.py +++ b/examples/workflows/agent_delegation.py @@ -20,7 +20,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class DelegateInput(BaseModel): @@ -80,8 +80,8 @@ class OrchestratorOutput(BaseModel): default_factory=list, ) - -async def worker_agent(agent_input: WorkerInput) -> Run[WorkerOutput]: +@workflowai.agent() +async def worker_agent(agent_input: WorkerInput) -> WorkerOutput: """ A specialized worker agent that handles specific tasks. @@ -95,13 +95,13 @@ async def worker_agent(agent_input: WorkerInput) -> Run[WorkerOutput]: async def delegate_task(agent_input: DelegateInput) -> DelegateOutput: """Delegate a task to a worker agent with a specific model.""" - # Create a new worker agent with the specified model - worker = workflowai.agent(id="worker", model=agent_input.model)(worker_agent) - run = await worker( + # Run the worker agent with the specified model + run = await worker_agent.run( WorkerInput( task=agent_input.task, context=agent_input.context, ), + model=agent_input.model, ) return DelegateOutput( response=run.output.response, @@ -114,7 +114,7 @@ async def delegate_task(agent_input: DelegateInput) -> DelegateOutput: model=Model.GPT_4O_LATEST, tools=[delegate_task], ) -async def orchestrator_agent(agent_input: OrchestratorInput) -> Run[OrchestratorOutput]: +async def orchestrator_agent(agent_input: OrchestratorInput) -> OrchestratorOutput: """ You are an expert orchestrator that breaks down complex objectives into smaller tasks and delegates them to specialized agents. You can use the delegate_task tool to assign @@ -146,7 +146,7 @@ async def main(): print("\nExample: Software Architecture Design") print("-" * 50) - result = await orchestrator_agent( + result = await orchestrator_agent.run( OrchestratorInput( objective="Design a scalable microservices architecture for an e-commerce platform", requirements=[ From e1f9e07f14d1a6f80595448bc4f2a38a796af7a5 Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Fri, 14 Feb 2025 09:39:12 -0500 Subject: [PATCH 5/5] fix: lint List --- examples/workflows/agent_delegation.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/examples/workflows/agent_delegation.py b/examples/workflows/agent_delegation.py index 072af04..91630e0 100644 --- a/examples/workflows/agent_delegation.py +++ b/examples/workflows/agent_delegation.py @@ -15,7 +15,7 @@ """ import asyncio -from typing import List, Optional +from typing import Optional from pydantic import BaseModel, Field @@ -25,6 +25,7 @@ class DelegateInput(BaseModel): """Input for delegating a task to a specialized agent.""" + task: str = Field(description="The task to delegate") model: Model = Field(description="The model to use for this task") context: Optional[str] = Field( @@ -35,6 +36,7 @@ class DelegateInput(BaseModel): class DelegateOutput(BaseModel): """Output from a delegated task.""" + response: str = Field(description="The agent's response to the task") confidence: float = Field( description="Confidence score between 0 and 1", @@ -45,6 +47,7 @@ class DelegateOutput(BaseModel): class WorkerInput(BaseModel): """Input for the worker agent.""" + task: str = Field(description="The task to perform") context: Optional[str] = Field( default=None, @@ -54,6 +57,7 @@ class WorkerInput(BaseModel): class WorkerOutput(BaseModel): """Output from the worker agent.""" + response: str = Field(description="The response to the task") confidence: float = Field( description="Confidence score between 0 and 1", @@ -64,8 +68,9 @@ class WorkerOutput(BaseModel): class OrchestratorInput(BaseModel): """Input for the orchestrator agent.""" + objective: str = Field(description="The high-level objective to achieve") - requirements: List[str] = Field( + requirements: list[str] = Field( description="List of specific requirements or constraints", default_factory=list, ) @@ -73,13 +78,15 @@ class OrchestratorInput(BaseModel): class OrchestratorOutput(BaseModel): """Final output from the orchestrator.""" + solution: str = Field(description="The final solution that meets the objective") explanation: str = Field(description="Explanation of how the solution was derived") - agents_used: List[str] = Field( + agents_used: list[str] = Field( description="List of agents/models used in the process", default_factory=list, ) + @workflowai.agent() async def worker_agent(agent_input: WorkerInput) -> WorkerOutput: """