diff --git a/examples/email_reply_agent.ipynb b/examples/email_reply_agent.ipynb index ef56c5c7..f7f4f40b 100644 --- a/examples/email_reply_agent.ipynb +++ b/examples/email_reply_agent.ipynb @@ -133,6 +133,7 @@ "source": [ "# import\n", "from langchain_chroma import Chroma\n", + "from langchain_openai import OpenAIEmbeddings\n", "from langchain_community.document_loaders import TextLoader\n", "from langchain_community.embeddings.sentence_transformer import (\n", " SentenceTransformerEmbeddings,\n", @@ -148,7 +149,7 @@ "docs = text_splitter.split_documents(documents)\n", "\n", "# create the open-source embedding function\n", - "embedding_function = SentenceTransformerEmbeddings(model_name=\"all-MiniLM-L6-v2\")\n", + "embedding_function = OpenAIEmbeddings()\n", "\n", "# load it into Chroma\n", "db = Chroma.from_documents(docs, embedding_function)" diff --git a/examples/hierarchical_blogging_team.py b/examples/hierarchical_blogging_team.py index 6651fbc3..9467af90 100644 --- a/examples/hierarchical_blogging_team.py +++ b/examples/hierarchical_blogging_team.py @@ -12,12 +12,14 @@ name: blogging-team team: name: BloggingTeam - supervisor: - name: supervisor + router: + name: parent-supervisor + kind: supervisor subteams: - - name: BloggingTeam - supervisor: - name: supervisor + - name: BlogResearchTeam + router: + name: bgsupervisor + kind: supervisor agents: - name: Reasercher job: Do a research on the internet and find articles of relevent to the topic asked by the user, always try to find the latest information on the same @@ -27,9 +29,10 @@ job: From the documents provider by the researcher write a blog of 300 words with can be readily published, make in engaging and add reference links to original blogs tools: - name: TavilySearchResults - - name: WritingTeam - supervisor: - name: supervisor + - name: BlogWritingTeam + router: + name: bwsupervisor + kind: supervisor agents: - name: Figure job: Do somethinh @@ -46,4 +49,4 @@ session = FloSession(llm).register_tool( name="TavilySearchResults", tool=TavilySearchResults() ) -flo: Flo = Flo.build(session, llm, yaml=yaml_data) +flo: Flo = Flo.build(session, yaml=yaml_data) diff --git a/flo_ai/builders/yaml_builder.py b/flo_ai/builders/yaml_builder.py index e9838a64..15dd0aea 100644 --- a/flo_ai/builders/yaml_builder.py +++ b/flo_ai/builders/yaml_builder.py @@ -22,7 +22,6 @@ def build_supervised_team(session: FloSession) -> ExecutableFlo: def validate_team(name_set: set, team_config: TeamConfig): validate_names(name_set, team_config.name) - [validate_names(name_set, agent.name) for agent in team_config.agents] def parse_and_build_subteams(session: FloSession, team_config: TeamConfig, name_set = set()) -> ExecutableFlo: flo_team = None diff --git a/flo_ai/models/flo_executable.py b/flo_ai/models/flo_executable.py index 508bdde2..f5a76e65 100644 --- a/flo_ai/models/flo_executable.py +++ b/flo_ai/models/flo_executable.py @@ -10,6 +10,8 @@ class ExecutableType(Enum): tool = "tool" reflection = "reflection" delegator = "delegator" + team = "team" + router = "router" @staticmethod def isAgent(type: 'ExecutableType'): @@ -26,7 +28,7 @@ class ExecutableFlo(FloMember): def __init__(self, name: str, runnable: Runnable, - type: str = "team") -> None: + type: str = ExecutableType.team) -> None: super().__init__(name, type) self.runnable = runnable diff --git a/flo_ai/models/flo_node.py b/flo_ai/models/flo_node.py index ea302f11..5c1dfd84 100644 --- a/flo_ai/models/flo_node.py +++ b/flo_ai/models/flo_node.py @@ -31,13 +31,22 @@ def build_from_team(self, flo_team: FloRoutedTeam) -> 'FloNode': return FloNode(( FloNode.Builder.__get_last_message | team_chain | FloNode.Builder.__join_graph ), flo_team.name, flo_team.type, flo_team.config) + + def build_from_router(self, flo_router) -> 'FloNode': + router_func = functools.partial(FloNode.Builder.__teamflo_router_node, agent=flo_router.executor, name=flo_router.router_name, agent_config=flo_router.config) + return FloNode(router_func, flo_router.router_name, flo_router.type, flo_router.config) @staticmethod def __teamflo_agent_node(state: TeamFloAgentState, agent: AgentExecutor, name: str, agent_config: AgentConfig): result = agent.invoke(state) - # TODO see how to fix this output = result if isinstance(result, str) else result["output"] return { STATE_NAME_MESSAGES: [HumanMessage(content=output, name=name)] } + + @staticmethod + def __teamflo_router_node(state: TeamFloAgentState, agent: AgentExecutor, name: str, agent_config: AgentConfig): + result = agent.invoke(state) + nextNode = result if isinstance(result, str) else result["next"] + return { "next": nextNode } @staticmethod def __get_last_message(state: TeamFloAgentState) -> str: diff --git a/flo_ai/router/flo_custom_router.py b/flo_ai/router/flo_custom_router.py index 05b5072e..139ffb59 100644 --- a/flo_ai/router/flo_custom_router.py +++ b/flo_ai/router/flo_custom_router.py @@ -33,21 +33,21 @@ def router_fn(state: TeamFloAgentState): function_def = { - "name": "route", - "description": "Select the next role.", - "parameters": { - "title": "routeSchema", - "type": "object", - "properties": { - "next": { - "title": "Next", - "anyOf": [ - {"enum": members}, - ], - } - }, - "required": ["next"], - } + "name": "route", + "description": "Select the next role.", + "parameters": { + "title": "routeSchema", + "type": "object", + "properties": { + "next": { + "title": "Next", + "anyOf": [ + {"enum": members}, + ], + } + }, + "required": ["next"], + } } chain = prompt | self.llm.bind_functions(functions=[function_def], function_call="route") | JsonOutputFunctionsParser() @@ -60,7 +60,7 @@ def router_fn(state: TeamFloAgentState): return router_fn - def build_agent_graph(self): + def build_graph(self): flo_agent_nodes = [self.build_node(flo_agent) for flo_agent in self.members] workflow = StateGraph(TeamFloAgentState) @@ -90,36 +90,6 @@ def build_agent_graph(self): return FloRoutedTeam(self.flo_team.name, workflow_graph, self.flo_team.config) - def build_team_graph(self): - flo_team_entry_chains = [self.build_node_for_teams(flo_agent) for flo_agent in self.members] - - super_graph = StateGraph(TeamFloAgentState) - - for flo_team_chain in flo_team_entry_chains: - agent_name = flo_team_chain.name - super_graph.add_node(agent_name, flo_team_chain.func) - - router_config = self.router_config - super_graph.add_edge(START, router_config.start_node) - for edge_config in router_config.edges: - edge = edge_config.edge - if len(edge) > 2: - teams = edge[1:] - router = self.build_router_fn(teams, edge_config.rule) - super_graph.add_conditional_edges(edge[0], router, {item: item for item in teams}) - else: - super_graph.add_edge(edge[0], edge[1]) - - if isinstance(router_config.end_node, list): - for node in router_config.end_node: - super_graph.add_edge(node, END) - else: - super_graph.add_edge(router_config.end_node, END) - - workflow_graph = super_graph.compile() - - return FloRoutedTeam(self.flo_team.name, workflow_graph, self.flo_team.config) - class Builder(): def __init__(self, session: FloSession, config: TeamConfig, flo_team: FloTeam,) -> None: diff --git a/flo_ai/router/flo_linear.py b/flo_ai/router/flo_linear.py index 1d40addd..19865ca3 100644 --- a/flo_ai/router/flo_linear.py +++ b/flo_ai/router/flo_linear.py @@ -14,7 +14,7 @@ def __init__(self, session: FloSession, config: TeamConfig, flo_team: FloTeam): flo_team=flo_team, executor=None, config=config) self.router_config = config.router - def build_agent_graph(self): + def build_graph(self): flo_agent_nodes = [self.build_node(member) for member in self.members] workflow = StateGraph(TeamFloAgentState) @@ -31,7 +31,6 @@ def build_agent_graph(self): parent_node = flo_agent_nodes[i] child_node = flo_agent_nodes[i+1] next_node = flo_agent_nodes[i+2] if (i+2) < len(flo_agent_nodes) else END - if (parent_node.kind == ExecutableType.reflection): self.add_reflection_edge(workflow, parent_node, child_node) continue @@ -56,33 +55,6 @@ def build_agent_graph(self): return FloRoutedTeam(self.flo_team.name, workflow_graph, self.flo_team.config) - def build_team_graph(self): - flo_team_entry_chains = [self.build_node_for_teams(flo_agent) for flo_agent in self.members] - # Define the graph. - super_graph = StateGraph(TeamFloAgentState) - # First add the nodes, which will do the work - for flo_team_chain in flo_team_entry_chains: - agent_name = flo_team_chain.name - super_graph.add_node(agent_name, flo_team_chain.func) - - if self.router_config.edges is None: - start_node_name = flo_team_entry_chains[0].name - end_node_name = flo_team_entry_chains[-1].name - super_graph.add_edge(START, start_node_name) - for i in range(len(flo_team_entry_chains) - 1): - agent1_name = flo_team_entry_chains[i].name - agent2_name = flo_team_entry_chains[i+1].name - super_graph.add_edge(agent1_name, agent2_name) - super_graph.add_edge(end_node_name, END) - else: - super_graph.add_edge(START, self.router_config.start_node) - for edge in self.router_config.edges: - super_graph.add_edge(edge[0], edge[1]) - super_graph.add_edge(self.router_config.end_node, END) - - super_graph = super_graph.compile() - return FloRoutedTeam(self.flo_team.name, super_graph, self.flo_team.config) - class Builder(): def __init__(self, session: FloSession, config: TeamConfig, flo_team: FloTeam,) -> None: diff --git a/flo_ai/router/flo_llm_router.py b/flo_ai/router/flo_llm_router.py index c06945bf..ffcdb58d 100644 --- a/flo_ai/router/flo_llm_router.py +++ b/flo_ai/router/flo_llm_router.py @@ -1,7 +1,6 @@ -from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser +from typing import Union from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from typing import Union from langchain_core.runnables import Runnable from flo_ai.state.flo_session import FloSession from flo_ai.constants.prompt_constants import FLO_FINISH @@ -11,6 +10,11 @@ from langgraph.graph import StateGraph from flo_ai.state.flo_state import TeamFloAgentState from flo_ai.yaml.config import TeamConfig +from langchain_core.output_parsers import JsonOutputParser +from langchain_core.pydantic_v1 import BaseModel, Field + +class NextAgent(BaseModel): + next: str = Field(description="Name of the next member to be called") class StateUpdateComponent: def __init__(self, name: str, session: FloSession) -> None: @@ -35,37 +39,19 @@ def __init__(self, executor = executor ) - def build_agent_graph(self): + def build_graph(self): flo_agent_nodes = [self.build_node(flo_agent) for flo_agent in self.members] workflow = StateGraph(TeamFloAgentState) for flo_agent_node in flo_agent_nodes: workflow.add_node(flo_agent_node.name, flo_agent_node.func) - workflow.add_node(self.router_name, self.executor) + workflow.add_node(self.router_name, self.build_node(self).func) for member in self.member_names: workflow.add_edge(member, self.router_name) workflow.add_conditional_edges(self.router_name, self.router_fn) workflow.set_entry_point(self.router_name) workflow_graph = workflow.compile() return FloRoutedTeam(self.flo_team.name, workflow_graph, self.flo_team.config) - - def build_team_graph(self): - flo_team_entry_chains = [self.build_node_for_teams(flo_agent) for flo_agent in self.members] - # Define the graph. - super_graph = StateGraph(TeamFloAgentState) - # First add the nodes, which will do the work - for flo_team_chain in flo_team_entry_chains: - super_graph.add_node(flo_team_chain.name, flo_team_chain.func) - super_graph.add_node(self.router_name, self.executor) - - for member in self.member_names: - super_graph.add_edge(member, self.router_name) - - super_graph.add_conditional_edges(self.router_name, self.router_fn) - - super_graph.set_entry_point(self.router_name) - super_graph = super_graph.compile() - return FloRoutedTeam(self.flo_team.name, super_graph, self.flo_team.config) - + class Builder: def __init__(self, session: FloSession, @@ -89,6 +75,7 @@ def __init__(self, " respond with the worker to act next " ) + self.parser = JsonOutputParser(pydantic_object=NextAgent) self.llm_router_prompt = ChatPromptTemplate.from_messages( [ ("system", router_base_system_message), @@ -97,38 +84,25 @@ def __init__(self, ( "system", "Given the conversation above, who should act next?" - " Or should we FINISH if the task is already answered ? Select one of: {options}", + " Or should we FINISH if the task is already answered ? Select one of: {options} \n {format_instructions}", ), ] - ).partial(options=str(self.options), members=", ".join(self.members), member_type=member_type, router_prompt=router_prompt) + ).partial( + options=str(self.options), + members=", ".join(self.members), + member_type=member_type, + router_prompt=router_prompt, + format_instructions=self.parser.get_format_instructions() + ) - def build(self): - function_def = { - "name": "route", - "description": "Select the next role.", - "parameters": { - "title": "routeSchema", - "type": "object", - "properties": { - "next": { - "title": "Next", - "anyOf": [ - {"enum": self.options}, - ], - } - }, - "required": ["next"], - } - } - + def build(self): chain = ( self.llm_router_prompt - | self.llm.bind_functions(functions=[function_def], function_call="route") - | JsonOutputFunctionsParser() - | StateUpdateComponent(self.name, self.session) + | self.llm + | self.parser ) - return FloLLMRouter(executor = chain, + return FloLLMRouter(executor=chain, flo_team=self.flo_team, name=self.name, session=self.session) \ No newline at end of file diff --git a/flo_ai/router/flo_router.py b/flo_ai/router/flo_router.py index 4dce706d..c6e16db0 100644 --- a/flo_ai/router/flo_router.py +++ b/flo_ai/router/flo_router.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from flo_ai.state.flo_session import FloSession from flo_ai.models.flo_team import FloTeam +from flo_ai.models.flo_member import FloMember from flo_ai.yaml.config import TeamConfig from flo_ai.models.flo_routed_team import FloRoutedTeam from flo_ai.models.flo_agent import FloAgent @@ -9,7 +10,6 @@ from flo_ai.models.flo_node import FloNode from flo_ai.constants.prompt_constants import FLO_FINISH from langgraph.graph import END,StateGraph -from flo_ai.models.flo_node import FloNode from flo_ai.models.flo_executable import ExecutableType import functools from typing import Union @@ -24,32 +24,27 @@ def __init__(self, session: FloSession, name: str, flo_team: FloTeam, executor, self.flo_team: FloTeam = flo_team self.members = flo_team.members self.member_names = [x.name for x in flo_team.members] - self.type: ExecutableType = flo_team.members[0].type + self.type: ExecutableType = ExecutableType.router self.executor = executor self.config = config - - def is_agent_supervisor(self): - return ExecutableType.isAgent(self.type) def build_routed_team(self) -> FloRoutedTeam: - if self.is_agent_supervisor(): - return self.build_agent_graph() - else: - return self.build_team_graph() - - @abstractmethod - def build_agent_graph(): - pass + return self.build_graph() @abstractmethod - def build_team_graph(): + def build_graph(): pass - def build_node(self, flo_agent: FloAgent) -> FloNode: - if (flo_agent.type == ExecutableType.delegator): - return FloNode(flo_agent.executor, flo_agent.name, flo_agent.type, flo_agent.config) + def build_node(self, flo_member: FloMember) -> FloNode: node_builder = FloNode.Builder() - return node_builder.build_from_agent(flo_agent) + if flo_member.type == ExecutableType.router: + return node_builder.build_from_router(flo_member) + if (flo_member.type == ExecutableType.team): + return node_builder.build_from_team(flo_member) + if (flo_member.type == ExecutableType.delegator): + return FloNode(flo_member.executor, flo_member.name, flo_member.type, flo_member.config) + node_builder = FloNode.Builder() + return node_builder.build_from_agent(flo_member) def router_fn(self, state: TeamFloAgentState): next = state["next"] @@ -59,26 +54,12 @@ def router_fn(self, state: TeamFloAgentState): if self.session.is_looping(node=next): return conditional_map[FLO_FINISH] return conditional_map[next] - - def build_node_for_teams(self, flo_team: FloRoutedTeam): - node_builder = FloNode.Builder() - return node_builder.build_from_team(flo_team) def update_reflection_state(self, state: TeamFloAgentState, reflection_agent_name: str): - tracker = None - if STATE_NAME_LOOP_CONTROLLER not in state or state[STATE_NAME_LOOP_CONTROLLER] is None: - tracker = dict() - else: - tracker = state[STATE_NAME_LOOP_CONTROLLER] - - if reflection_agent_name in tracker: - tracker[reflection_agent_name] += 1 - else: - tracker[reflection_agent_name] = 1 - - return { - STATE_NAME_LOOP_CONTROLLER: tracker - } + tracker = state.get(STATE_NAME_LOOP_CONTROLLER) or {} + tracker[reflection_agent_name] = tracker.get(reflection_agent_name, 0) + 1 + return {STATE_NAME_LOOP_CONTROLLER: tracker} + def add_delegation_edge(self, workflow: StateGraph, parent: FloNode, delegation_node: FloNode, nextNode: Union[FloNode|str]): to_agent_names = [x.name for x in delegation_node.config.to] diff --git a/flo_ai/router/flo_supervisor.py b/flo_ai/router/flo_supervisor.py index a888f75b..2260dd16 100644 --- a/flo_ai/router/flo_supervisor.py +++ b/flo_ai/router/flo_supervisor.py @@ -1,4 +1,3 @@ -from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from typing import Union @@ -8,8 +7,9 @@ from flo_ai.router.flo_llm_router import FloLLMRouter, StateUpdateComponent from flo_ai.models.flo_team import FloTeam from flo_ai.yaml.config import TeamConfig +from langchain_core.output_parsers import JsonOutputParser +from flo_ai.router.flo_llm_router import NextAgent -# TODO, maybe add description about what team members can do supervisor_system_message = ( "You are a supervisor tasked with managing a conversation between the" " following {member_type}: {members}. Given the following user request," @@ -46,6 +46,7 @@ def __init__(self, self.members = [agent.name for agent in flo_team.members] self.options = self.members + [FLO_FINISH] member_type = "workers" if flo_team.members[0].type == "agent" else "team members" + self.parser = JsonOutputParser(pydantic_object=NextAgent) self.supervisor_prompt = ChatPromptTemplate.from_messages( [ ("system", supervisor_system_message), @@ -53,38 +54,24 @@ def __init__(self, ( "system", "Given the conversation above, who should act next?" - " Or should we FINISH if the task is already answered, Select one of: {options}", + " Or should we FINISH if the task is already answered, Select one of: {options} \n {format_instructions}", ), ] - ).partial(options=str(self.options), members=", ".join(self.members), member_type=member_type) + ).partial( + options=str(self.options), + members=", ".join(self.members), + member_type=member_type, + format_instructions=self.parser.get_format_instructions() + ) def build(self): - function_def = { - "name": "route", - "description": "Select the next role.", - "parameters": { - "title": "routeSchema", - "type": "object", - "properties": { - "next": { - "title": "Next", - "anyOf": [ - {"enum": self.options}, - ], - } - }, - "required": ["next"], - } - } - chain = ( self.supervisor_prompt - | self.llm.bind_functions(functions=[function_def], function_call="route") - | JsonOutputFunctionsParser() - | StateUpdateComponent(self.name, self.session) + | self.llm + | self.parser ) - return FloSupervisor(executor = chain, + return FloSupervisor(executor=chain, flo_team=self.flo_team, name=self.name, session=self.session) \ No newline at end of file