From e09b18a6051c2a510a36ae920943fc20ad5b03a7 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Fri, 1 Aug 2025 12:33:35 +0530 Subject: [PATCH 1/4] Feat - implementation inside inputs and agent system prompts - added variables finding, validation and resolution inside run method of both agent and arium - having a resolved_variables flag for agent to avoid re-resolving (in case of muti-agent arium run) - variable_extractor util to find, validate and resolve variables. --- flo_ai/flo_ai/arium/arium.py | 84 +++++++++++- flo_ai/flo_ai/arium/builder.py | 12 +- flo_ai/flo_ai/models/agent.py | 103 ++++++++++---- flo_ai/flo_ai/models/base_agent.py | 3 +- flo_ai/flo_ai/utils/variable_extractor.py | 160 ++++++++++++++++++++++ 5 files changed, 330 insertions(+), 32 deletions(-) create mode 100644 flo_ai/flo_ai/utils/variable_extractor.py diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index 561d604f..7e4ed74a 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -1,11 +1,12 @@ from flo_ai.arium.base import BaseArium from flo_ai.arium.memory import MessageMemory, BaseMemory from flo_ai.llm.base_llm import ImageMessage -from typing import List +from typing import List, Dict, Any, Optional from flo_ai.models.agent import Agent from flo_ai.tool.base_tool import Tool from flo_ai.arium.models import StartNode, EndNode from flo_ai.utils.logger import logger +from flo_ai.utils.variable_extractor import extract_variables_from_inputs, extract_agent_variables, validate_multi_agent_variables, resolve_variables class Arium(BaseArium): @@ -18,7 +19,7 @@ def compile(self): self.validate_graph() self.is_compiled = True - async def run(self, inputs: List[str | ImageMessage]): + async def run(self, inputs: List[str | ImageMessage], variables: Optional[Dict[str, Any]] = None): if not self.is_compiled: raise ValueError('Arium is not compiled') @@ -27,8 +28,15 @@ async def run(self, inputs: List[str | ImageMessage]): if not self.nodes: raise ValueError('Arium has no nodes') + + # Extract and validate variables from inputs and all agents + self._extract_and_validate_variables(inputs, variables) + + # Resolve variables in inputs and agent prompts + resolved_inputs = self._resolve_inputs(inputs, variables) + self._resolve_agent_prompts(variables) - return await self._execute_graph(inputs) + return await self._execute_graph(resolved_inputs) async def _execute_graph(self, inputs: List[str | ImageMessage]): [self.memory.add(msg) for msg in inputs] @@ -58,10 +66,78 @@ async def _execute_graph(self, inputs: List[str | ImageMessage]): current_edge = next_edge return self.memory.get() + + def _extract_and_validate_variables(self, inputs: List[str | ImageMessage], variables: Dict[str, Any]) -> None: + """Extract variables from inputs and agents, then validate them. + + Args: + inputs: List of input messages + variables: Dictionary of variable name to value mappings + + Raises: + ValueError: If any required variables are missing + """ + # Extract variables from inputs + input_variables = extract_variables_from_inputs(inputs) + + # Extract variables from all agents in the workflow + agents_variables = {} + for node in self.nodes.values(): + if isinstance(node, Agent): + agent_vars = extract_agent_variables(node) + if agent_vars: + agents_variables[node.name] = agent_vars + + # Validate input variables separately with cleaner error message + if input_variables: + missing_input_vars = input_variables - set(variables.keys()) + if missing_input_vars: + provided_keys = sorted(variables.keys()) + raise ValueError( + f"Input contains missing variables: {sorted(missing_input_vars)}. " + f"Provided variables: {provided_keys}" + ) + + # Validate agent variables with detailed agent breakdown + if agents_variables: + validate_multi_agent_variables(agents_variables, variables) + + def _resolve_inputs(self, inputs: List[str | ImageMessage], variables: Dict[str, Any]) -> List[str | ImageMessage]: + """Resolve variables in input messages. + + Args: + inputs: List of input messages + variables: Dictionary of variable name to value mappings + + Returns: + List of inputs with variables resolved + """ + resolved_inputs = [] + for input_item in inputs: + if isinstance(input_item, str): + # Resolve variables in text input + resolved_input = resolve_variables(input_item, variables) + resolved_inputs.append(resolved_input) + else: + # ImageMessage objects don't need variable resolution + resolved_inputs.append(input_item) + return resolved_inputs + + def _resolve_agent_prompts(self, variables: Dict[str, Any]) -> None: + """Resolve variables in all agent system prompts and mark them as resolved. + + Args: + variables: Dictionary of variable name to value mappings + """ + for node in self.nodes.values(): + if isinstance(node, Agent): + node.system_prompt = resolve_variables(node.system_prompt, variables) + node.resolved_variables = True async def _execute_node(self, node: Agent | Tool | StartNode | EndNode): if isinstance(node, Agent): - return await node.run(self.memory.get()) + # Variables are already resolved, pass empty dict to avoid re-processing + return await node.run(self.memory.get(), variables={}) elif isinstance(node, Tool): return await node.execute(self.memory.get()) elif isinstance(node, StartNode): diff --git a/flo_ai/flo_ai/arium/builder.py b/flo_ai/flo_ai/arium/builder.py index 7511bb2a..c1372220 100644 --- a/flo_ai/flo_ai/arium/builder.py +++ b/flo_ai/flo_ai/arium/builder.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Callable, Union +from typing import List, Optional, Callable, Union, Dict, Any from flo_ai.arium.arium import Arium from flo_ai.arium.memory import MessageMemory, BaseMemory from flo_ai.models.agent import Agent @@ -128,10 +128,14 @@ def build(self) -> Arium: self._arium = arium return arium - async def build_and_run(self, inputs: List[Union[str, ImageMessage]]) -> List[dict]: - """Build the Arium and run it with the given inputs.""" + async def build_and_run( + self, + inputs: List[Union[str, ImageMessage]], + variables: Optional[Dict[str, Any]] = None + ) -> List[dict]: + """Build the Arium and run it with the given inputs and optional runtime variables.""" arium = self.build() - return await arium.run(inputs) + return await arium.run(inputs, variables=variables) def visualize( self, output_path: str = 'arium_graph.png', title: str = 'Arium Workflow' diff --git a/flo_ai/flo_ai/models/agent.py b/flo_ai/flo_ai/models/agent.py index fd1cbc84..861621cb 100644 --- a/flo_ai/flo_ai/models/agent.py +++ b/flo_ai/flo_ai/models/agent.py @@ -5,6 +5,7 @@ from flo_ai.tool.base_tool import Tool, ToolExecutionError from flo_ai.models.agent_error import AgentError from flo_ai.utils.logger import logger +from flo_ai.utils.variable_extractor import extract_variables_from_inputs, extract_agent_variables, validate_multi_agent_variables, resolve_variables class Agent(BaseAgent): @@ -40,35 +41,73 @@ def __init__( self.output_schema = output_schema self.role = role - async def run(self, inputs: List[str | ImageMessage] | str) -> str: + async def run(self, inputs: List[str | ImageMessage] | str, variables: Optional[Dict[str, Any]] = None) -> str: + variables = variables or {} + if isinstance(inputs, str): inputs = [inputs] - - for input in inputs: - if isinstance(input, ImageMessage): - self.add_to_history('user', self.llm.format_image_in_message(input)) - else: - self.add_to_history('user', input) + + # Perform runtime variable validation if not already resolved (single agent usage) + if not self.resolved_variables: + # Extract variables from inputs and system prompt + input_variables = extract_variables_from_inputs(inputs) + agent_variables = extract_agent_variables(self) + all_required_variables = input_variables.union(agent_variables) + + # Validate that all required variables are provided + if all_required_variables: + agents_variables = {self.name: all_required_variables} + validate_multi_agent_variables(agents_variables, variables) + + # Resolve variables and mark as resolved + self.system_prompt = resolve_variables(self.system_prompt, variables) + + # Process inputs and resolve variables in string inputs + for input in inputs: + if isinstance(input, ImageMessage): + self.add_to_history('user', self.llm.format_image_in_message(input)) + else: + # Resolve variables in text input + resolved_input = resolve_variables(input, variables) + self.add_to_history('user', resolved_input) + + # after resolving agent system prompts and inputs, mark variables as resolved + self.resolved_variables = True + + else: + # Variables already resolved, process inputs without variable resolution + for input in inputs: + if isinstance(input, ImageMessage): + self.add_to_history('user', self.llm.format_image_in_message(input)) + else: + self.add_to_history('user', input) retry_count = 0 # If no tools, act as conversational agent if not self.tools: - return await self._run_conversational(retry_count) + return await self._run_conversational(retry_count, variables) # Otherwise, run as tool agent - return await self._run_with_tools(retry_count) + return await self._run_with_tools(retry_count, variables) - async def _run_conversational(self, retry_count: int) -> str: + async def _run_conversational(self, retry_count: int, variables: Optional[Dict[str, Any]] = None) -> str: """Run as a conversational agent when no tools are provided""" + variables = variables or {} + while retry_count < self.max_retries: try: + # Resolve variables in system prompt + system_content = ( + self._get_cot_prompt(variables) + if self.reasoning_pattern == ReasoningPattern.COT + else resolve_variables(self.system_prompt, variables) + ) + messages = [ { 'role': 'system', - 'content': self._get_cot_prompt() - if self.reasoning_pattern == ReasoningPattern.COT - else self.system_prompt, + 'content': system_content, } ] + self.conversation_history @@ -111,18 +150,24 @@ async def _run_conversational(self, retry_count: int) -> str: original_error=e, ) - async def _run_with_tools(self, retry_count: int = 0) -> str: + async def _run_with_tools(self, retry_count: int = 0, variables: Optional[Dict[str, Any]] = None) -> str: """Run as a tool-using agent when tools are provided""" + variables = variables or {} + while retry_count < self.max_retries: try: + # Resolve variables in system prompt based on reasoning pattern + if self.reasoning_pattern == ReasoningPattern.REACT: + system_content = self._get_react_prompt(variables) + elif self.reasoning_pattern == ReasoningPattern.COT: + system_content = self._get_cot_prompt(variables) + else: + system_content = resolve_variables(self.system_prompt, variables) + messages = [ { 'role': 'system', - 'content': self._get_react_prompt() - if self.reasoning_pattern == ReasoningPattern.REACT - else self._get_cot_prompt() - if self.reasoning_pattern == ReasoningPattern.COT - else self.system_prompt, + 'content': system_content, } ] + self.conversation_history @@ -239,12 +284,18 @@ async def _run_with_tools(self, retry_count: int = 0) -> str: raise AgentError(f'Failed after maximum {self.max_retries} attempts.') - def _get_react_prompt(self) -> str: + def _get_react_prompt(self, variables: Optional[Dict[str, Any]] = None) -> str: """Get system prompt modified for ReACT pattern""" + variables = variables or {} + tools_desc = '\n'.join( [f'- {tool.name}: {tool.description}' for tool in self.tools] ) - react_prompt = f"""{self.system_prompt} + + # Resolve variables in the base system prompt + resolved_system_prompt = resolve_variables(self.system_prompt, variables) + + react_prompt = f"""{resolved_system_prompt} When solving tasks, follow this format: Thought: Analyze the situation and think about what to do @@ -263,12 +314,18 @@ def _get_react_prompt(self) -> str: return react_prompt - def _get_cot_prompt(self) -> str: + def _get_cot_prompt(self, variables: Optional[Dict[str, Any]] = None) -> str: """Get system prompt modified for Chain of Thought pattern""" + variables = variables or {} + tools_desc = '\n'.join( [f'- {tool.name}: {tool.description}' for tool in self.tools] ) - cot_prompt = f"""{self.system_prompt} + + # Resolve variables in the base system prompt + resolved_system_prompt = resolve_variables(self.system_prompt, variables) + + cot_prompt = f"""{resolved_system_prompt} When solving tasks, follow this Chain of Thought reasoning format: Let me think through this step by step: diff --git a/flo_ai/flo_ai/models/base_agent.py b/flo_ai/flo_ai/models/base_agent.py index 106c2788..a46fd820 100644 --- a/flo_ai/flo_ai/models/base_agent.py +++ b/flo_ai/flo_ai/models/base_agent.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, List, Tuple +from typing import Dict, Any, List, Tuple, Optional from abc import ABC, abstractmethod from enum import Enum from flo_ai.llm.base_llm import BaseLLM @@ -29,6 +29,7 @@ def __init__( self.agent_type = agent_type self.llm = llm self.max_retries = max_retries + self.resolved_variables = False self.conversation_history: List[Dict[str, str]] = [] @abstractmethod diff --git a/flo_ai/flo_ai/utils/variable_extractor.py b/flo_ai/flo_ai/utils/variable_extractor.py new file mode 100644 index 00000000..514ed1f0 --- /dev/null +++ b/flo_ai/flo_ai/utils/variable_extractor.py @@ -0,0 +1,160 @@ +""" +Variable extraction utilities for the Flo AI framework. + +This module provides functions to extract variable placeholders from text, +inputs, and agent configurations for runtime variable validation. +""" + +import re +from typing import List, Set, Dict, Any +from flo_ai.llm.base_llm import ImageMessage + + +def extract_variables_from_text(text: str) -> Set[str]: + """Extract variable placeholders from text using pattern. + + Args: + text: Text containing variable placeholders like + + Returns: + Set of variable names found in the text + + Examples: + >>> extract_variables_from_text("Hello , your is important") + {'name', 'role'} + >>> extract_variables_from_text("No variables here") + set() + """ + if not text: + return set() + + # Use regex to find all patterns + # \w+ matches word characters (letters, digits, underscore) + variable_pattern = r'<(\w+)>' + matches = re.findall(variable_pattern, text) + return set(matches) + + +def extract_variables_from_inputs(inputs: List[str | ImageMessage]) -> Set[str]: + """Extract variable placeholders from a list of input messages. + + Args: + inputs: List of input strings or ImageMessages + + Returns: + Set of variable names found across all string inputs + + Note: + ImageMessage objects are skipped as they don't contain variable placeholders + """ + all_variables = set() + + for input_item in inputs: + if isinstance(input_item, str): + variables = extract_variables_from_text(input_item) + all_variables.update(variables) + # Skip ImageMessage objects as they don't contain text variables + + return all_variables + + +def extract_agent_variables(agent) -> Set[str]: + """Extract variable placeholders from an agent's system prompt. + + Args: + agent: Agent instance with a system_prompt attribute + + Returns: + Set of variable names found in the agent's system prompt + + Note: + This function avoids importing Agent to prevent circular imports + """ + if not hasattr(agent, 'system_prompt'): + return set() + + return extract_variables_from_text(agent.system_prompt) + + +def validate_variables( + required_variables: Set[str], + provided_variables: Dict[str, Any], + context: str = "" +) -> None: + """Validate that all required variables are provided. + + Args: + required_variables: Set of variable names that are required + provided_variables: Dictionary of variable name to value mappings + context: Optional context string for error messages (e.g., agent name) + + Raises: + ValueError: If any required variables are missing + """ + missing = required_variables - set(provided_variables.keys()) + if missing: + available = list(provided_variables.keys()) if provided_variables else [] + context_str = f" for {context}" if context else "" + raise ValueError( + f"Missing required variables{context_str}: {sorted(missing)}. " + f"Available variables: {available}" + ) + + +def resolve_variables(text: str, variables: Dict[str, Any]) -> str: + """Replace patterns with actual values + + Args: + text: Text containing variable placeholders like + variables: Dictionary of variable name to value mappings + + Returns: + Text with variables resolved + + Raises: + ValueError: If a variable placeholder is found but not provided in variables + """ + if not text or not variables: + return text + + def replace_var(match): + var_name = match.group(1) + if var_name in variables: + return str(variables[var_name]) + else: + available = list(variables.keys()) + raise ValueError( + f"Variable '{var_name}' referenced in text but not provided. " + f"Available variables: {available}" + ) + + return re.sub(r'<(\w+)>', replace_var, text) + + +def validate_multi_agent_variables( + agents_variables: Dict[str, Set[str]], + provided_variables: Dict[str, Any] +) -> None: + """Validate variables for multiple agents and provide detailed error messages. + + Args: + agents_variables: Dictionary mapping agent names to their required variables + provided_variables: Dictionary of variable name to value mappings + + Raises: + ValueError: If any agents are missing required variables, with detailed breakdown + """ + missing_by_agent = {} + provided_keys = set(provided_variables.keys()) + + for agent_name, required_vars in agents_variables.items(): + missing = required_vars - provided_keys + if missing: + missing_by_agent[agent_name] = sorted(missing) + + if missing_by_agent: + error_msg = "Missing required variables for agents:\n" + for agent_name, missing_vars in missing_by_agent.items(): + error_msg += f" - Agent '{agent_name}': {missing_vars}\n" + error_msg += f"Provided variables: {sorted(provided_keys)}" + raise ValueError(error_msg) \ No newline at end of file From 3425a2678d40fa11e61b98dfe5c86fc8f025171d Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Fri, 1 Aug 2025 12:39:24 +0530 Subject: [PATCH 2/4] remove unused import --- flo_ai/flo_ai/models/base_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flo_ai/flo_ai/models/base_agent.py b/flo_ai/flo_ai/models/base_agent.py index a46fd820..ab506b46 100644 --- a/flo_ai/flo_ai/models/base_agent.py +++ b/flo_ai/flo_ai/models/base_agent.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, List, Tuple, Optional +from typing import Dict, Any, List, Tuple from abc import ABC, abstractmethod from enum import Enum from flo_ai.llm.base_llm import BaseLLM From bcfc91b5ef0a8f9e0407b40d2182075b15c0d04b Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Fri, 1 Aug 2025 13:16:28 +0530 Subject: [PATCH 3/4] variables examples, lint fix --- flo_ai/examples/variables_workflow_example.py | 242 +++++++++++++ .../variables_workflow_yaml_example.py | 325 ++++++++++++++++++ flo_ai/flo_ai/arium/arium.py | 51 ++- flo_ai/flo_ai/arium/builder.py | 6 +- flo_ai/flo_ai/models/agent.py | 53 +-- flo_ai/flo_ai/utils/variable_extractor.py | 71 ++-- 6 files changed, 669 insertions(+), 79 deletions(-) create mode 100644 flo_ai/examples/variables_workflow_example.py create mode 100644 flo_ai/examples/variables_workflow_yaml_example.py diff --git a/flo_ai/examples/variables_workflow_example.py b/flo_ai/examples/variables_workflow_example.py new file mode 100644 index 00000000..4b8b0609 --- /dev/null +++ b/flo_ai/examples/variables_workflow_example.py @@ -0,0 +1,242 @@ +""" +Simple variables workflow example with separate functions for complete and incomplete variables. + +This example shows: +1. Simple function to test complete variables (success case) +2. Simple function to test incomplete variables (error case) +""" + +import os +import asyncio +from flo_ai.builder.agent_builder import AgentBuilder +from flo_ai.arium.builder import AriumBuilder +from flo_ai.models.agent import Agent +from flo_ai.llm.gemini_llm import Gemini + +from dotenv import load_dotenv + +load_dotenv() + +api_key = os.getenv('GOOGLE_API_KEY') + + +async def test_single_agent_complete_variables(): + """Test single agent with complete variables""" + print('\n🟢 TESTING SINGLE AGENT - COMPLETE VARIABLES') + print('-' * 50) + + llm = Gemini(model='gemini-2.5-flash', temperature=0.7, api_key=api_key) + + # Create agent that uses variables (automatically extracted) + translator = ( + AgentBuilder() + .with_name('translator') + .with_prompt('You are a translator. Use this tone: ') + .with_llm(llm) + .build() + ) + + # Complete variables + variables = { + 'target_language': 'Spanish', + 'tone': 'formal', + 'text_to_translate': 'Welcome to our application', + } + + print('Variables provided:') + for key, value in variables.items(): + print(f' āœ“ {key}: {value}') + + try: + result = await translator.run( + 'Translate the following text: to ', + variables=variables, + ) + print('\nāœ… SUCCESS: Single agent executed successfully!') + print(f'Result: {result}') + except Exception as e: + print(f'āŒ Unexpected error: {e}') + + +async def test_single_agent_incomplete_variables(): + """Test single agent with incomplete variables""" + print('\nšŸ”“ TESTING SINGLE AGENT - INCOMPLETE VARIABLES') + print('-' * 50) + + llm = Gemini(model='gemini-2.5-flash', temperature=0.7, api_key=api_key) + + # Create agent that uses variables (automatically extracted) + calculator = ( + AgentBuilder() + .with_name('calculator') + .with_prompt('You are a calculator. Show the result in format.') + .with_llm(llm) + .build() + ) + + # Incomplete variables - missing some required variables + incomplete_variables = { + 'operation': 'addition', + 'number1': '15', + # Missing: 'number2' and 'format' + } + + print('Variables provided:') + for key, value in incomplete_variables.items(): + print(f' āœ“ {key}: {value}') + + print('\nMissing variables:') + print(' āŒ number2') + print(' āŒ format') + + try: + await calculator.run( + 'Please calculate the result of of and .', + variables=incomplete_variables, + ) + print("āŒ ERROR: Should have failed but didn't!") + except ValueError as e: + print('\nāœ… SUCCESS: Expected error caught!') + print(f'Error message: {e}') + + +async def test_multi_agent_complete_variables(): + """Test workflow with complete variables - should succeed""" + print('🟢 TESTING MULTI AGENT - COMPLETE VARIABLES') + print('-' * 40) + + llm = Gemini(model='gemini-2.5-flash', temperature=0.7, api_key=api_key) + + # Agent 1: Content Creator - uses 'topic' and 'style' variables + content_creator = Agent( + name='content_creator', + system_prompt='You are a content creator. Write in a