diff --git a/.gitignore b/.gitignore index 0b58bd0e..25d51645 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ dataset bin .DS_Store *.sql -*.log \ No newline at end of file +*.log +*.yaml \ No newline at end of file diff --git a/examples/python/output_parser.py b/examples/python/output_parser.py new file mode 100644 index 00000000..c07e7d0f --- /dev/null +++ b/examples/python/output_parser.py @@ -0,0 +1,62 @@ +import os +from flo_ai import FloAgent, FloSession, Flo +from langchain_community.tools.tavily_search.tool import TavilySearchResults +from dotenv import load_dotenv +from langchain_openai import AzureChatOpenAI +from flo_ai.parsers.flo_json_parser import FloJsonParser +from flo_ai.state.flo_kv_collector import FloKVCollector + +load_dotenv() +llm = AzureChatOpenAI( + azure_endpoint=os.getenv('AZURE_GPT4_ENDPOINT'), + model_name='gpt-4o', + temperature=0.2, + max_tokens=4096, + api_version='2024-08-01-preview', + api_key=os.getenv('AZURE_OPEN_AI_API_KEY'), +) + +session = FloSession(llm).register_tool( + name='TavilySearchResults', tool=TavilySearchResults() +) + +format = { + 'name': 'NameFormat', + 'fields': [ + { + 'type': 'str', + 'description': 'The first name of the person', + 'name': 'first_name', + }, + { + 'type': 'str', + 'description': 'The middle name of the person', + 'name': 'middle_name', + }, + { + 'type': 'str', + 'description': 'The last name of the person', + 'name': 'last_name', + }, + ], +} + +dc = FloKVCollector() + +researcher = FloAgent.create( + session, + name='Researcher', + role='Internet Researcher', + job='What is the first name, last name and middle name of the the person user asks about', + tools=[TavilySearchResults()], + parser=FloJsonParser.create(json_dict=format), + data_collector=dc, +) + + +Flo.set_log_level('DEBUG') +flo: Flo = Flo.create(session, researcher) +result = flo.invoke('Mahatma Gandhi') + +print(result) +print(dc.fetch()) diff --git a/examples/python/output_parser_yaml.py b/examples/python/output_parser_yaml.py new file mode 100644 index 00000000..b350d330 --- /dev/null +++ b/examples/python/output_parser_yaml.py @@ -0,0 +1,52 @@ +import os +from flo_ai import FloSession, Flo +from langchain_community.tools.tavily_search.tool import TavilySearchResults +from dotenv import load_dotenv +from langchain_openai import AzureChatOpenAI +from flo_ai.state.flo_kv_collector import FloKVCollector + +load_dotenv() +llm = AzureChatOpenAI( + azure_endpoint=os.getenv('AZURE_GPT4_ENDPOINT'), + model_name='gpt-4o', + temperature=0.2, + max_tokens=4096, + api_version='2024-08-01-preview', + api_key=os.getenv('AZURE_OPEN_AI_API_KEY'), +) + +session = FloSession(llm).register_tool( + name='InternetSearchTool', tool=TavilySearchResults() +) + +dc = FloKVCollector() + +session.register_data_collector('kv', dc) + +simple_reseacher = """ +apiVersion: flo/alpha-v1 +kind: FloAgent +name: weather-assistant +agent: + name: WeatherAssistant + kind: agentic + job: > + Given the person name, guess the first and last name + tools: + - name: InternetSearchTool + parser: + name: NameFormatter + fields: + - type: str + description: The first name of the person + name: first_name + - type: str + description: The first name of the person + name: last_name + data_collector: kv +""" + +flo: Flo = Flo.build(session, simple_reseacher) +result = flo.invoke('Gandhi') + +print(dc.fetch()) diff --git a/flo_ai/common/flo_logger.py b/flo_ai/common/flo_logger.py index eadc2ebe..a1610b57 100644 --- a/flo_ai/common/flo_logger.py +++ b/flo_ai/common/flo_logger.py @@ -66,7 +66,7 @@ def setLevel(self, level: Union[str, int]) -> None: handler.setLevel(level) def _log( - self, level: int, msg: str, session: Optional[str] = None, *args, **kwargs + self, level: int, msg: str, session: Optional[Any] = None, *args, **kwargs ): if not self.isEnabledFor(level): return diff --git a/flo_ai/core.py b/flo_ai/core.py index 79b8f9bb..5d7c6269 100644 --- a/flo_ai/core.py +++ b/flo_ai/core.py @@ -18,6 +18,7 @@ set_logger_internal, FloLogConfig, ) +from flo_ai.models.flo_node import FloNode from flo_ai.models.flo_agent import FloAgent from langchain.tools import StructuredTool @@ -84,6 +85,8 @@ def build( executable: ExecutableFlo = build_supervised_team( session, to_supervised_team(yaml) ) + if isinstance(executable, FloAgent): + executable = FloNode.Builder(session).build_from_agent(executable) return Flo(session, executable) if routed_team is not None: return Flo(session, routed_team.build_routed_team()) @@ -91,11 +94,10 @@ def build( @staticmethod def create(session: FloSession, routed_team: Union[FloRouter, FloAgent]): - runnable = ( - routed_team.build_routed_team() - if isinstance(routed_team, FloRouter) - else routed_team - ) + if isinstance(routed_team, FloRouter): + runnable = routed_team.build_routed_team() + else: + runnable = FloNode.Builder(session).build_from_agent(routed_team) return Flo(session, runnable) @staticmethod diff --git a/flo_ai/factory/agent_factory.py b/flo_ai/factory/agent_factory.py index 5ed8940c..6e271353 100644 --- a/flo_ai/factory/agent_factory.py +++ b/flo_ai/factory/agent_factory.py @@ -1,6 +1,7 @@ +import json from typing import Optional from flo_ai.state.flo_session import FloSession -from flo_ai.yaml.config import AgentConfig +from flo_ai.yaml.config import AgentConfig, Parser from flo_ai.models.flo_agent import FloAgent from flo_ai.models.flo_llm_agent import FloLLMAgent from flo_ai.models.flo_reflection_agent import FloReflectionAgent @@ -10,6 +11,7 @@ from flo_ai.models.delegate import Delegate from flo_ai.constants.common_constants import DOCUMENTATION_AGENT_ANCHOR from enum import Enum +from flo_ai.parsers.flo_json_parser import FloJsonParser class AgentKinds(Enum): @@ -59,7 +61,18 @@ def __create_agentic_agent( session: FloSession, agent: AgentConfig, tool_map ) -> FloAgent: agent_model = AgentFactory.__resolve_model(session, agent.model) + dc = ( + session.data_collectors[agent.data_collector] + if agent.data_collector is not None + else None + ) tools = [tool_map[tool.name] for tool in agent.tools] + if isinstance(agent.parser, Parser): + parser = FloJsonParser.create( + json_dict=json.loads(agent.parser.model_dump_json()) + ) + else: + parser = session.parsers[agent.parser] if agent.parser is not None else None flo_agent: FloAgent = FloAgent.Builder( session, name=agent.name, @@ -69,12 +82,25 @@ def __create_agentic_agent( llm=agent_model, on_error=session.on_agent_error, model_name=agent.model, + parser=parser, + data_collector=dc, ).build() return flo_agent @staticmethod def __create_llm_agent(session: FloSession, agent: AgentConfig) -> FloLLMAgent: agent_model = AgentFactory.__resolve_model(session, agent.model) + dc = ( + session.data_collectors[agent.data_collector] + if agent.data_collector is not None + else None + ) + if isinstance(agent.parser, Parser): + parser = FloJsonParser.create( + json_dict=json.loads(agent.parser.model_dump_json()) + ) + else: + parser = session.parsers[agent.parser] if agent.parser is not None else None builder = FloLLMAgent.Builder( session, name=agent.name, @@ -82,6 +108,8 @@ def __create_llm_agent(session: FloSession, agent: AgentConfig) -> FloLLMAgent: role=agent.role, llm=agent_model, model_name=agent.model, + parser=parser, + data_collector=dc, ) llm_agent: FloLLMAgent = builder.build() return llm_agent diff --git a/flo_ai/models/flo_agent.py b/flo_ai/models/flo_agent.py index 89ddc773..e8ee08a0 100644 --- a/flo_ai/models/flo_agent.py +++ b/flo_ai/models/flo_agent.py @@ -4,10 +4,11 @@ from langchain_core.runnables import Runnable from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from flo_ai.models.flo_executable import ExecutableFlo +from flo_ai.models.flo_executable import ExecutableFlo, ExecutableType from flo_ai.state.flo_session import FloSession from typing import Union, Optional, Callable -from flo_ai.models.flo_executable import ExecutableType +from flo_ai.state.flo_data_collector import FloDataCollector +from flo_ai.parsers.flo_parser import FloParser class FloAgent(ExecutableFlo): @@ -17,11 +18,13 @@ def __init__( agent: Runnable, executor: AgentExecutor, model_name: str, + data_collector: Optional[FloDataCollector] = None, ) -> None: super().__init__(name, executor, ExecutableType.agentic) self.model_name = model_name self.agent: Runnable = (agent,) self.executor: AgentExecutor = executor + self.data_collector = data_collector @staticmethod def create( @@ -32,6 +35,8 @@ def create( role: Optional[str] = None, on_error: Union[str, Callable] = True, llm: Union[BaseLanguageModel, None] = None, + parser: Optional[FloParser] = None, + data_collector: Optional[FloDataCollector] = None, ): model_name = 'default' if llm is None else llm.name return FloAgent.Builder( @@ -43,6 +48,8 @@ def create( on_error=on_error, llm=llm, model_name=model_name, + parser=parser, + data_collector=data_collector, ).build() class Builder: @@ -57,6 +64,8 @@ def __init__( llm: Union[BaseLanguageModel, None] = None, on_error: Union[str, Callable] = True, model_name: Union[str, None] = 'default', + parser: Optional[FloParser] = None, + data_collector: Optional[FloDataCollector] = None, ) -> None: prompt: Union[ChatPromptTemplate, str] = job self.name: str = name @@ -67,6 +76,8 @@ def __init__( if role is not None else [('system', prompt)] ) + if parser is not None: + system_prompts.append('\n{format_instructions}') system_prompts.append(MessagesPlaceholder(variable_name='messages')) system_prompts.append(MessagesPlaceholder(variable_name='agent_scratchpad')) self.prompt: ChatPromptTemplate = ( @@ -74,9 +85,14 @@ def __init__( if isinstance(prompt, str) else prompt ) + if parser is not None: + self.prompt = self.prompt.partial( + format_instructions=parser.get_format_instructions() + ) self.tools: list[BaseTool] = tools self.verbose = verbose self.on_error = on_error + self.data_collector = data_collector def build(self) -> AgentExecutor: agent = create_tool_calling_agent(self.llm, self.tools, self.prompt) @@ -87,4 +103,10 @@ def build(self) -> AgentExecutor: return_intermediate_steps=True, handle_parsing_errors=self.on_error, ) - return FloAgent(self.name, agent, executor, model_name=self.model_name) + return FloAgent( + self.name, + agent, + executor, + model_name=self.model_name, + data_collector=self.data_collector, + ) diff --git a/flo_ai/models/flo_llm_agent.py b/flo_ai/models/flo_llm_agent.py index 1b799012..05afb872 100644 --- a/flo_ai/models/flo_llm_agent.py +++ b/flo_ai/models/flo_llm_agent.py @@ -6,13 +6,22 @@ from typing import Union, Optional from langchain_core.output_parsers import StrOutputParser from flo_ai.models.flo_executable import ExecutableType +from flo_ai.parsers.flo_parser import FloParser +from flo_ai.state.flo_data_collector import FloDataCollector class FloLLMAgent(ExecutableFlo): - def __init__(self, name: str, executor: Runnable, model_name: str) -> None: + def __init__( + self, + name: str, + executor: Runnable, + model_name: str, + data_collector: Optional[FloDataCollector] = None, + ) -> None: super().__init__(name, executor, ExecutableType.llm) self.executor: Runnable = executor self.model_name: str = model_name + self.data_collector = data_collector @staticmethod def create( @@ -21,6 +30,8 @@ def create( job: str, role: Optional[str] = None, llm: Union[BaseLanguageModel, None] = None, + parser: Optional[FloParser] = None, + data_collector: Optional[FloDataCollector] = None, ): model_name = 'default' if llm is None else llm.name return FloLLMAgent.Builder( @@ -30,6 +41,8 @@ def create( role=role, llm=llm, model_name=model_name, + parser=parser, + data_collector=data_collector, ).build() class Builder: @@ -41,25 +54,35 @@ def __init__( role: Optional[str] = None, llm: Union[BaseLanguageModel, None] = None, model_name: str = None, + parser: Optional[FloParser] = None, + data_collector: Optional[FloDataCollector] = None, ) -> None: self.model_name = model_name prompt: Union[ChatPromptTemplate, str] = job self.name: str = name self.llm = llm if llm is not None else session.llm - # TODO improve to add more context of what other agents are available system_prompts = ( [('system', 'You are a {}'.format(role)), ('system', prompt)] if role is not None else [('system', prompt)] ) + if parser is not None: + system_prompts.append('\n{format_instructions}') system_prompts.append(MessagesPlaceholder(variable_name='messages')) self.prompt: ChatPromptTemplate = ( ChatPromptTemplate.from_messages(system_prompts) if isinstance(prompt, str) else prompt ) + if parser is not None: + self.prompt = self.prompt.partial( + format_instructions=parser.get_format_instructions() + ) + self.data_collector = data_collector def build(self) -> Runnable: executor = self.prompt | self.llm | StrOutputParser() - return FloLLMAgent(self.name, executor, self.model_name) + return FloLLMAgent( + self.name, executor, self.model_name, data_collector=self.data_collector + ) diff --git a/flo_ai/models/flo_node.py b/flo_ai/models/flo_node.py index 6b3163d4..67303dbe 100644 --- a/flo_ai/models/flo_node.py +++ b/flo_ai/models/flo_node.py @@ -5,7 +5,7 @@ from flo_ai.models.delegate import Delegate from langchain.agents import AgentExecutor from flo_ai.state.flo_state import TeamFloAgentState, STATE_NAME_MESSAGES -from langchain_core.messages import HumanMessage +from langchain_core.messages import AIMessage, HumanMessage from flo_ai.models.flo_executable import ExecutableType from flo_ai.state.flo_session import FloSession from typing import Optional, Type, List @@ -14,6 +14,8 @@ FloRouterCallback, FloCallback, ) +from flo_ai.common.flo_logger import get_logger +from flo_ai.state.flo_data_collector import FloDataCollector class FloNode: @@ -29,6 +31,9 @@ def __init__( self.kind: ExecutableType = kind self.delegate = delegate + def invoke(self, query, config): + return self.func({STATE_NAME_MESSAGES: [HumanMessage(content=query)]}) + class Builder: def __init__(self, session: FloSession) -> None: self.session = session @@ -40,6 +45,7 @@ def build_from_agent(self, flo_agent: FloAgent) -> 'FloNode': name=flo_agent.name, session=self.session, model_name=flo_agent.model_name, + data_collector=flo_agent.data_collector, ) return FloNode(agent_func, flo_agent.name, flo_agent.type) @@ -104,6 +110,7 @@ def __teamflo_agent_node( name: str, session: FloSession, model_name: str, + data_collector: Optional[FloDataCollector] = None, ): agent_cbs: List[FloAgentCallback] = FloNode.Builder.__filter_callbacks( session, FloAgentCallback @@ -122,6 +129,11 @@ def __teamflo_agent_node( try: result = agent.invoke(state) output = result if isinstance(result, str) else result['output'] + if data_collector is not None: + get_logger().info( + 'appending output to data collector', session=session + ) + data_collector.append(output) except Exception as e: [ callback.on_agent_error(name, model_name, e, **{}) @@ -140,7 +152,7 @@ def __teamflo_agent_node( callback.on_agent_start(name, model_name, output, **{}) for callback in flo_cbs ] - return {STATE_NAME_MESSAGES: [HumanMessage(content=output, name=name)]} + return {STATE_NAME_MESSAGES: [AIMessage(content=output, name=name)]} @staticmethod def __filter_callbacks(session: FloSession, type: Type): diff --git a/flo_ai/parsers/flo_json_parser.py b/flo_ai/parsers/flo_json_parser.py new file mode 100644 index 00000000..c1c11a7f --- /dev/null +++ b/flo_ai/parsers/flo_json_parser.py @@ -0,0 +1,61 @@ +import json +from flo_ai.parsers.flo_parser import FloParser +from typing import List, Dict, Any, Optional +from pydantic import BaseModel, Field, create_model +from flo_ai.error.flo_exception import FloException +from langchain_core.output_parsers import PydanticOutputParser +from dataclasses import dataclass + + +@dataclass +class ParseContract: + name: str + fields: List[Dict[str, Any]] + + +class FloJsonParser(FloParser): + def __init__(self, parse_contract: ParseContract): + self.contract = parse_contract + super().__init__() + + def __create_contract_from_json(self) -> BaseModel: + type_mapping = {'str': str, 'int': int, 'bool': bool, 'float': float} + pydantic_fields = { + field['name']: ( + type_mapping[field['type']], + Field(..., description=field['description']), + ) + for field in self.contract.fields + } + DynamicModel = create_model(self.contract.name, **pydantic_fields) + return DynamicModel + + def get_format_instructions(self): + return PydanticOutputParser( + pydantic_object=self.__create_contract_from_json() + ).get_format_instructions() + + def create(json_dict: Optional[Dict] = None, json_path: Optional[str] = None): + return FloJsonParser.Builder(json_dict=json_dict, json_path=json_path).build() + + class Builder: + def __init__( + self, json_dict: Optional[Dict] = None, json_path: Optional[str] = None + ): + if json_dict is None and json_path is None: + raise FloException( + 'Either of json_dict or json_path is required to build a FloJsonParser' + ) + self.json_dict = json_dict + self.json_path = json_path + + def build(self): + if self.json_dict: + name = self.json_dict['name'] + fields = self.json_dict['fields'] + else: + with open(self.json_path) as f: + json_contract = json.load(f) + name = json_contract['name'] + fields = json_contract['fields'] + return FloJsonParser(ParseContract(name=name, fields=fields)) diff --git a/flo_ai/parsers/flo_parser.py b/flo_ai/parsers/flo_parser.py new file mode 100644 index 00000000..65a06323 --- /dev/null +++ b/flo_ai/parsers/flo_parser.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class FloParser(ABC): + @abstractmethod + def get_format_instructions(self): + pass diff --git a/flo_ai/parsers/flo_pydantic_parser.py b/flo_ai/parsers/flo_pydantic_parser.py new file mode 100644 index 00000000..a140b34c --- /dev/null +++ b/flo_ai/parsers/flo_pydantic_parser.py @@ -0,0 +1,24 @@ +from flo_ai.parsers.flo_parser import FloParser +from pydantic import BaseModel +from langchain_core.output_parsers import PydanticOutputParser + + +class FloPydanticParser(FloParser): + def __init__(self, output_model: BaseModel): + self.model = output_model + super().__init__() + + def get_format_instructions(self): + return PydanticOutputParser( + pydantic_object=self.model + ).get_format_instructions() + + def create(output_model: BaseModel): + return FloPydanticParser.Builder(output_model).build() + + class Builder: + def __init__(self, output_model: BaseModel): + self.model = output_model + + def build(self): + return FloPydanticParser(self.model) diff --git a/flo_ai/state/flo_data_collector.py b/flo_ai/state/flo_data_collector.py new file mode 100644 index 00000000..78b07f2e --- /dev/null +++ b/flo_ai/state/flo_data_collector.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + + +class FloDataCollector(ABC): + @abstractmethod + def append(): + pass + + @abstractmethod + def fetch(): + pass + + @abstractmethod + def peek(): + pass + + @abstractmethod + def pop(): + pass diff --git a/flo_ai/state/flo_kv_collector.py b/flo_ai/state/flo_kv_collector.py new file mode 100644 index 00000000..770b84f6 --- /dev/null +++ b/flo_ai/state/flo_kv_collector.py @@ -0,0 +1,36 @@ +import json +from typing import Dict, List, Any +from flo_ai.state.flo_data_collector import FloDataCollector + + +class FloKVCollector(FloDataCollector): + def __init__(self): + super().__init__() + self.data: List[Dict[str, Any]] = [] + + def append(self, agent_output): + output_dict = json.loads(self.__remove_after_braces(agent_output)) + self.data.append(output_dict) + + def __remove_after_braces(self, s: str) -> str: + first_brace = s.find('{') + last_brace = s.rfind('}') + + if first_brace != -1 and last_brace != -1 and first_brace < last_brace: + return s[first_brace : last_brace + 1] + return s + + def pop(self): + return self.data.pop() + + def peek(self): + return self.data[-1] if len(self.data) > 0 else None + + def fetch(self): + return self.__merge_data() + + def __merge_data(self): + result = {} + for d in self.data: + result.update(d) + return result diff --git a/flo_ai/state/flo_session.py b/flo_ai/state/flo_session.py index 82457fb0..3228e4fd 100644 --- a/flo_ai/state/flo_session.py +++ b/flo_ai/state/flo_session.py @@ -10,7 +10,9 @@ FloAgentCallback, FloRouterCallback, ) - +from flo_ai.state.flo_data_collector import FloDataCollector +from flo_ai.state.flo_kv_collector import FloKVCollector +from flo_ai.parsers.flo_parser import FloParser from typing import Optional @@ -45,6 +47,9 @@ def __init__( self.tools = dict() self.models: Dict[str, BaseLanguageModel] = dict() self.tools: Dict[str, BaseTool] = dict() + # TODO maybe create a default if not provided + self.data_collectors: Dict[str, FloDataCollector] = dict() + self.parsers: Dict[str, FloParser] = dict() self.counter = dict() self.navigation: list[str] = list() self.pattern_series = dict() @@ -79,6 +84,20 @@ def register_model(self, name: str, model: BaseLanguageModel): get_logger().info(f"Model '{name}' registered for session {self.session_id}") return self + def register_parser(self, name: str, parser: FloParser): + self.parsers[name] = parser + get_logger().info(f"Parser '{name}' registered for session {self.session_id}") + return self + + def register_data_collector( + self, name: str = '__default', collector: FloDataCollector = FloKVCollector() + ): + self.data_collectors[name] = collector + get_logger().info( + f"Data Collection '{name}' registered for session {self.session_id}" + ) + return self + def register_callback( self, callback: Union[FloRouterCallback, FloAgentCallback, FloToolCallback] ): diff --git a/flo_ai/yaml/config.py b/flo_ai/yaml/config.py index 66361f8e..ad90e6db 100644 --- a/flo_ai/yaml/config.py +++ b/flo_ai/yaml/config.py @@ -1,5 +1,5 @@ from pydantic import BaseModel -from typing import List, Union +from typing import List, Union, Dict, Any import yaml import re from typing import Optional @@ -41,6 +41,11 @@ class MemberKey(BaseModel): name: str +class Parser(BaseModel): + name: str + fields: Optional[List[Dict[str, Any]]] = None + + class AgentConfig(BaseModel): name: str role: Optional[str] = None @@ -50,6 +55,8 @@ class AgentConfig(BaseModel): to: Optional[List[MemberKey]] = None retry: Optional[int] = 1 model: Optional[str] = None + parser: Union[Parser, str] = None + data_collector: Optional[str] = None class EdgeConfig(BaseModel):