diff --git a/README.md b/README.md index b4995f86..542365e1 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,8 @@ Flo AI is a Python framework that makes building production-ready AI agents and - [Builder Pattern Benefits](#builder-pattern-benefits) - [πŸ“„ YAML-Based Arium Workflows](#-yaml-based-arium-workflows) - [🧠 LLM-Powered Routers in YAML (NEW!)](#-llm-powered-routers-in-yaml-new) + - [πŸ”„ ReflectionRouter: Structured Reflection Workflows (NEW!)](#-reflectionrouter-structured-reflection-workflows-new) + - [πŸ”„ PlanExecuteRouter: Cursor-Style Plan-and-Execute Workflows (NEW!)](#-planexecuterouter-cursor-style-plan-and-execute-workflows-new) - [πŸ“– Documentation](#-documentation) - [🌟 Why Flo AI?](#-why-flo-ai) - [🎯 Use Cases](#-use-cases) @@ -1429,6 +1431,8 @@ arium: 1. **Smart Router** (`type: smart`): General-purpose routing based on content analysis 2. **Task Classifier** (`type: task_classifier`): Routes based on keywords and examples 3. **Conversation Analysis** (`type: conversation_analysis`): Context-aware routing +4. **Reflection Router** (`type: reflection`): Structured Aβ†’Bβ†’Aβ†’C patterns for reflection workflows +5. **PlanExecute Router** (`type: plan_execute`): Cursor-style plan-and-execute workflows with step tracking **✨ Key Benefits:** - 🚫 **No Code Required**: Define routing logic purely in YAML @@ -1450,6 +1454,560 @@ async def run_intelligent_workflow(): return result ``` +##### πŸ”„ ReflectionRouter: Structured Reflection Workflows (NEW!) + +The **ReflectionRouter** is designed specifically for reflection-based workflows that follow Aβ†’Bβ†’Aβ†’C patterns, commonly used for mainβ†’criticβ†’mainβ†’final agent sequences. This pattern is perfect for iterative improvement workflows where a critic agent provides feedback before final processing. + +**πŸ“‹ Key Features:** +- 🎯 **Pattern Tracking**: Automatically tracks progress through defined reflection sequences +- πŸ”„ **Self-Reference Support**: Allows routing back to the same agent (Aβ†’Bβ†’A patterns) +- πŸ“Š **Visual Progress**: Shows current position with β—‹ pending, βœ“ completed indicators +- πŸ›‘οΈ **Loop Prevention**: Built-in safety mechanisms to prevent infinite loops +- πŸŽ›οΈ **Flexible Patterns**: Supports both 2-agent (Aβ†’Bβ†’A) and 3-agent (Aβ†’Bβ†’Aβ†’C) flows + +**🎯 Supported Patterns:** + +1. **A β†’ B β†’ A** (2 agents): Main β†’ Critic β†’ Main β†’ End +2. **A β†’ B β†’ A β†’ C** (3 agents): Main β†’ Critic β†’ Main β†’ Final + +```yaml +# Simple A β†’ B β†’ A reflection pattern +metadata: + name: "content-reflection-workflow" + version: "1.0.0" + description: "Content creation with critic feedback loop" + +arium: + agents: + - name: "writer" + role: "Content Writer" + job: "Create and improve content based on feedback from critics." + model: + provider: "openai" + name: "gpt-4o-mini" + settings: + temperature: 0.7 + + - name: "critic" + role: "Content Critic" + job: "Review content and provide constructive feedback for improvement." + model: + provider: "openai" + name: "gpt-4o-mini" + settings: + temperature: 0.3 + + # ✨ ReflectionRouter definition + routers: + - name: "reflection_router" + type: "reflection" # Specialized for reflection patterns + flow_pattern: [writer, critic, writer] # A β†’ B β†’ A pattern + model: + provider: "openai" + name: "gpt-4o-mini" + settings: + temperature: 0.2 + allow_early_exit: false # Strict adherence to pattern + + workflow: + start: "writer" + edges: + - from: "writer" + to: [critic, writer] # Can go to critic or self-reference + router: "reflection_router" + - from: "critic" + to: [writer] # Always returns to writer + router: "reflection_router" + end: [writer] # Writer produces final output +``` + +```yaml +# Advanced A β†’ B β†’ A β†’ C reflection pattern +metadata: + name: "advanced-reflection-workflow" + version: "1.0.0" + description: "Full reflection cycle with dedicated final agent" + +arium: + agents: + - name: "researcher" + role: "Research Agent" + job: "Conduct research and gather information on topics." + model: + provider: "openai" + name: "gpt-4o-mini" + + - name: "reviewer" + role: "Research Reviewer" + job: "Review research quality and suggest improvements." + model: + provider: "anthropic" + name: "claude-3-5-sonnet-20240620" + + - name: "synthesizer" + role: "Information Synthesizer" + job: "Create final synthesis and conclusions from research." + model: + provider: "openai" + name: "gpt-4o" + + routers: + - name: "research_reflection_router" + type: "reflection" + flow_pattern: [researcher, reviewer, researcher, synthesizer] # A β†’ B β†’ A β†’ C + settings: + allow_early_exit: true # Allow smart early completion + + workflow: + start: "researcher" + edges: + - from: "researcher" + to: [reviewer, researcher, synthesizer] # All possible destinations + router: "research_reflection_router" + - from: "reviewer" + to: [researcher, reviewer, synthesizer] + router: "research_reflection_router" + - from: "synthesizer" + to: [end] + end: [synthesizer] +``` + +**πŸ”§ ReflectionRouter Configuration Options:** + +```yaml +routers: + - name: "my_reflection_router" + type: "reflection" + flow_pattern: [main_agent, critic, main_agent, final_agent] # Define your pattern + model: # Optional: LLM for routing decisions + provider: "openai" + name: "gpt-4o-mini" + settings: # Optional settings + temperature: 0.2 # Router temperature (lower = more deterministic) + allow_early_exit: false # Allow early completion if LLM determines pattern is done + fallback_strategy: "first" # first, last, random - fallback when LLM fails +``` + +**πŸ—οΈ Programmatic Usage:** + +```python +import asyncio +from flo_ai.arium import AriumBuilder +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.llm_router import create_main_critic_reflection_router + +async def reflection_workflow_example(): + llm = OpenAI(model='gpt-4o-mini', api_key='your-api-key') + + # Create agents + main_agent = Agent( + name='main_agent', + system_prompt='Create solutions and improve them based on feedback.', + llm=llm + ) + + critic = Agent( + name='critic', + system_prompt='Provide constructive feedback for improvement.', + llm=llm + ) + + final_agent = Agent( + name='final_agent', + system_prompt='Polish and finalize the work.', + llm=llm + ) + + # Create reflection router - A β†’ B β†’ A β†’ C pattern + reflection_router = create_main_critic_reflection_router( + main_agent='main_agent', + critic_agent='critic', + final_agent='final_agent', + allow_early_exit=False, # Strict pattern adherence + llm=llm + ) + + # Build workflow + result = await ( + AriumBuilder() + .add_agents([main_agent, critic, final_agent]) + .start_with(main_agent) + .add_edge(main_agent, [critic, final_agent], reflection_router) + .add_edge(critic, [main_agent, final_agent], reflection_router) + .end_with(final_agent) + .build_and_run(["Create a comprehensive project proposal"]) + ) + + return result + +# Alternative: Direct factory usage +from flo_ai.arium.llm_router import create_llm_router + +reflection_router = create_llm_router( + 'reflection', + flow_pattern=['writer', 'editor', 'writer'], # A β†’ B β†’ A + allow_early_exit=False, + llm=llm +) +``` + +**πŸ’‘ ReflectionRouter Intelligence:** + +The ReflectionRouter automatically: +- **Tracks Progress**: Knows which step in the pattern should execute next +- **Prevents Loops**: Uses execution context to avoid infinite cycles +- **Provides Guidance**: Shows LLM the suggested next step and current progress +- **Handles Self-Reference**: Properly validates flows that return to the same agent +- **Visual Feedback**: Displays pattern progress: `β—‹ writer β†’ βœ“ critic β†’ β—‹ writer` + +**🎯 Perfect Use Cases:** +- πŸ“ **Content Creation**: Writer β†’ Editor β†’ Writer β†’ Publisher +- πŸ”¬ **Research Workflows**: Researcher β†’ Reviewer β†’ Researcher β†’ Synthesizer +- πŸ’Ό **Business Analysis**: Analyst β†’ Critic β†’ Analyst β†’ Decision Maker +- 🎨 **Creative Processes**: Creator β†’ Critic β†’ Creator β†’ Finalizer +- πŸ§ͺ **Iterative Refinement**: Any process requiring feedback and improvement cycles + +**⚑ Quick Start Example:** + +```python +# Minimal A β†’ B β†’ A pattern +yaml_config = """ +arium: + agents: + - name: main_agent + job: "Main work agent" + model: {provider: openai, name: gpt-4o-mini} + - name: critic + job: "Feedback agent" + model: {provider: openai, name: gpt-4o-mini} + + routers: + - name: reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent] + + workflow: + start: main_agent + edges: + - from: main_agent + to: [critic, main_agent] + router: reflection_router + - from: critic + to: [main_agent] + router: reflection_router + end: [main_agent] +""" + +result = await AriumBuilder().from_yaml(yaml_str=yaml_config).build_and_run(["Your task"]) +``` + +The ReflectionRouter makes implementing sophisticated feedback loops and iterative improvement workflows incredibly simple, whether you need a 2-agent or 3-agent pattern! πŸš€ + +##### πŸ”„ PlanExecuteRouter: Cursor-Style Plan-and-Execute Workflows (NEW!) + +The **PlanExecuteRouter** implements sophisticated plan-and-execute patterns similar to how Cursor works. It automatically breaks down complex tasks into detailed execution plans and coordinates step-by-step execution with intelligent progress tracking. + +**πŸ“‹ Key Features:** +- 🎯 **Automatic Task Breakdown**: Creates detailed execution plans from high-level tasks +- πŸ“Š **Step Tracking**: Real-time progress monitoring with visual indicators (β—‹ ⏳ βœ… ❌) +- πŸ”„ **Phase Coordination**: Intelligent routing between planning, execution, and review phases +- πŸ›‘οΈ **Dependency Management**: Handles step dependencies and execution order automatically +- πŸ’Ύ **Plan Persistence**: Uses PlanAwareMemory for stateful plan storage and updates +- πŸ”§ **Error Recovery**: Built-in retry logic for failed steps + +**🎯 Perfect for Cursor-Style Workflows:** +- πŸ’» **Software Development**: Requirements β†’ Design β†’ Implementation β†’ Testing β†’ Review +- πŸ“ **Content Creation**: Planning β†’ Writing β†’ Editing β†’ Review β†’ Publishing +- πŸ”¬ **Research Projects**: Plan β†’ Investigate β†’ Analyze β†’ Synthesize β†’ Report +- πŸ“Š **Business Processes**: Any multi-step workflow with dependencies + +**πŸ“„ YAML Configuration:** + +```yaml +# Complete Plan-Execute Workflow +metadata: + name: "development-plan-execute" + version: "1.0.0" + description: "Cursor-style development workflow" + +arium: + agents: + - name: planner + role: Project Planner + job: > + Break down complex development tasks into detailed, sequential execution plans. + Create clear steps with dependencies and agent assignments. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: developer + role: Software Developer + job: > + Implement features step by step according to execution plans. + Provide detailed implementation and update step status. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + - name: tester + role: QA Engineer + job: > + Test implementations thoroughly and validate functionality. + Create comprehensive test scenarios and report results. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + + - name: reviewer + role: Senior Reviewer + job: > + Provide final quality assessment and approval. + Review completed work for best practices and requirements. + model: + provider: openai + name: gpt-4o-mini + + # PlanExecuteRouter configuration + routers: + - name: dev_plan_router + type: plan_execute # Router type for plan-execute workflows + agents: # Available agents and their capabilities + planner: "Creates detailed execution plans by breaking down tasks" + developer: "Implements features and code according to plan specifications" + tester: "Tests implementations and validates functionality" + reviewer: "Reviews and approves completed work" + model: # Optional: LLM for routing decisions + provider: openai + name: gpt-4o-mini + settings: # Optional configuration + temperature: 0.2 # Router decision temperature + planner_agent: planner # Agent responsible for creating plans + executor_agent: developer # Default agent for executing steps + reviewer_agent: reviewer # Optional agent for final review + max_retries: 3 # Maximum retries for failed steps + + workflow: + start: planner + edges: + # All agents can route to all others based on plan state + - from: planner + to: [developer, tester, reviewer, planner] + router: dev_plan_router + - from: developer + to: [developer, tester, reviewer, planner] + router: dev_plan_router + - from: tester + to: [developer, tester, reviewer, planner] + router: dev_plan_router + - from: reviewer + to: [end] + end: [reviewer] +``` + +**πŸ—οΈ Programmatic Usage:** + +```python +import asyncio +from flo_ai.arium import AriumBuilder +from flo_ai.arium.memory import PlanAwareMemory +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.llm_router import create_plan_execute_router + +async def cursor_style_workflow(): + llm = OpenAI(model='gpt-4o-mini', api_key='your-api-key') + + # Create specialized agents + planner = Agent( + name='planner', + system_prompt='Create detailed execution plans by breaking down tasks into sequential steps.', + llm=llm + ) + + developer = Agent( + name='developer', + system_prompt='Implement features step by step according to execution plans.', + llm=llm + ) + + tester = Agent( + name='tester', + system_prompt='Test implementations and validate functionality thoroughly.', + llm=llm + ) + + reviewer = Agent( + name='reviewer', + system_prompt='Review completed work and provide final approval.', + llm=llm + ) + + # Create plan-execute router + plan_router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + additional_agents={'tester': 'Tests implementations and validates quality'}, + llm=llm + ) + + # Use PlanAwareMemory for plan state persistence + memory = PlanAwareMemory() + + # Build and run workflow + result = await ( + AriumBuilder() + .with_memory(memory) + .add_agents([planner, developer, tester, reviewer]) + .start_with(planner) + .add_edge(planner, [developer, tester, reviewer, planner], plan_router) + .add_edge(developer, [developer, tester, reviewer, planner], plan_router) + .add_edge(tester, [developer, tester, reviewer, planner], plan_router) + .add_edge(reviewer, [developer, tester, reviewer, planner], plan_router) + .end_with(reviewer) + .build_and_run(["Create a REST API for user authentication with JWT tokens"]) + ) + + return result + +# Alternative: Factory function +from flo_ai.arium.llm_router import create_plan_execute_router + +plan_router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + llm=llm +) +``` + +**πŸ’‘ How PlanExecuteRouter Works:** + +The router intelligently coordinates workflow phases: + +1. **Planning Phase**: + - Detects when no execution plan exists + - Routes to planner agent to create detailed plan + - Plan stored as ExecutionPlan object in PlanAwareMemory + +2. **Execution Phase**: + - Analyzes plan state and step dependencies + - Routes to appropriate agents for next ready steps + - Updates step status (pending β†’ in-progress β†’ completed) + - Handles parallel execution of independent steps + +3. **Review Phase**: + - Detects when all steps are completed + - Routes to reviewer agent for final validation + - Manages error recovery for failed steps + +**πŸ“Š Plan Progress Visualization:** + +``` +πŸ“‹ EXECUTION PLAN: User Authentication API +πŸ“Š CURRENT PROGRESS: +βœ… design_schema: Design user database schema β†’ developer +βœ… implement_registration: Create registration endpoint β†’ developer +⏳ implement_login: Add login with JWT β†’ developer (depends: design_schema, implement_registration) +β—‹ add_middleware: Authentication middleware β†’ developer (depends: implement_login) +β—‹ write_tests: Comprehensive testing β†’ tester (depends: add_middleware) +β—‹ final_review: Security and code review β†’ reviewer (depends: write_tests) + +🎯 NEXT ACTION: Execute step 'implement_login' +🎯 SUGGESTED AGENT: developer +``` + +**πŸ”§ Advanced Configuration Options:** + +```yaml +routers: + - name: advanced_plan_router + type: plan_execute + agents: + planner: "Creates execution plans" + frontend_dev: "Frontend implementation" + backend_dev: "Backend implementation" + devops: "Deployment and infrastructure" + qa_tester: "Quality assurance testing" + security_reviewer: "Security review" + product_owner: "Product validation" + model: + provider: openai + name: gpt-4o + settings: + temperature: 0.1 # Lower for more deterministic routing + planner_agent: planner # Plan creation agent + executor_agent: backend_dev # Default execution agent + reviewer_agent: product_owner # Final review agent + max_retries: 5 # Retry attempts for failed steps + allow_parallel_execution: true # Enable parallel step execution + plan_validation: strict # Validate plan completeness +``` + +**⚑ Quick Start Example:** + +```python +# Minimal plan-execute workflow +yaml_config = """ +arium: + agents: + - name: planner + job: "Create execution plans" + model: {provider: openai, name: gpt-4o-mini} + - name: executor + job: "Execute plan steps" + model: {provider: openai, name: gpt-4o-mini} + - name: reviewer + job: "Review final results" + model: {provider: openai, name: gpt-4o-mini} + + routers: + - name: simple_plan_router + type: plan_execute + agents: + planner: "Creates plans" + executor: "Executes steps" + reviewer: "Reviews results" + settings: + planner_agent: planner + executor_agent: executor + reviewer_agent: reviewer + + workflow: + start: planner + edges: + - from: planner + to: [executor, reviewer, planner] + router: simple_plan_router + - from: executor + to: [executor, reviewer, planner] + router: simple_plan_router + - from: reviewer + to: [end] + end: [reviewer] +""" + +result = await AriumBuilder().from_yaml(yaml_str=yaml_config).build_and_run(["Your complex task"]) +``` + +**🎯 Use Cases and Examples:** + +- πŸ“± **App Development**: "Build a todo app with React and Node.js" +- πŸ›’ **E-commerce**: "Create a shopping cart system with payment processing" +- πŸ“Š **Data Pipeline**: "Build ETL pipeline for customer analytics" +- πŸ” **Security**: "Implement OAuth2 authentication system" +- πŸ“ˆ **Analytics**: "Create real-time dashboard with user metrics" + +The PlanExecuteRouter brings Cursor-style intelligent task automation to Flo AI, making it incredibly easy to build sophisticated multi-step workflows that adapt and execute complex tasks automatically! πŸš€ + #### YAML Workflow with Variables ```yaml diff --git a/flo_ai/examples/concept_demo.py b/flo_ai/examples/concept_demo.py new file mode 100644 index 00000000..ab573783 --- /dev/null +++ b/flo_ai/examples/concept_demo.py @@ -0,0 +1,284 @@ +""" +PlanExecuteRouter Concept Demo + +This demonstrates the plan-execute concept without requiring API calls. +Shows the architecture and explains how to fix the planner loop issue. +""" + +import asyncio +from flo_ai.arium.memory import PlanAwareMemory, ExecutionPlan, PlanStep, StepStatus +import uuid + + +def demonstrate_plan_aware_memory(): + """Show how PlanAwareMemory works with ExecutionPlan objects""" + print('πŸ“‹ PlanAwareMemory Concept Demo') + print('=' * 35) + + # Create memory + memory = PlanAwareMemory() + + # Create a sample execution plan + plan = ExecutionPlan( + id=str(uuid.uuid4()), + title='Build User Authentication API', + description='Create complete user auth system with registration and login', + steps=[ + PlanStep( + id='design_schema', + description='Design user database schema', + agent='developer', + status=StepStatus.PENDING, + ), + PlanStep( + id='implement_registration', + description='Implement user registration endpoint', + agent='developer', + dependencies=['design_schema'], + status=StepStatus.PENDING, + ), + PlanStep( + id='implement_login', + description='Implement user login with JWT tokens', + agent='developer', + dependencies=['design_schema', 'implement_registration'], + status=StepStatus.PENDING, + ), + PlanStep( + id='test_endpoints', + description='Test all authentication endpoints', + agent='tester', + dependencies=['implement_login'], + status=StepStatus.PENDING, + ), + PlanStep( + id='security_review', + description='Review security implementation', + agent='reviewer', + dependencies=['test_endpoints'], + status=StepStatus.PENDING, + ), + ], + ) + + # Store plan in memory + memory.add_plan(plan) + print(f'βœ… Plan stored: {plan.title}') + print(f'πŸ“Š Total steps: {len(plan.steps)}') + + # Show initial state + def show_plan_status(): + current = memory.get_current_plan() + print(f'\nπŸ“‹ Plan Status: {current.title}') + for step in current.steps: + status_icon = { + StepStatus.PENDING: 'β—‹', + StepStatus.IN_PROGRESS: '⏳', + StepStatus.COMPLETED: 'βœ…', + StepStatus.FAILED: '❌', + }.get(step.status, 'β—‹') + deps = ( + f" (depends: {', '.join(step.dependencies)})" + if step.dependencies + else '' + ) + print(f' {status_icon} {step.id}: {step.description} β†’ {step.agent}{deps}') + + show_plan_status() + + # Simulate step execution + print('\nπŸ”„ Simulating step-by-step execution...') + + current_plan = memory.get_current_plan() + + # Execute step 1 + next_steps = current_plan.get_next_steps() + print(f'\n🎯 Next steps ready: {len(next_steps)}') + step1 = next_steps[0] + print(f'⏳ Executing: {step1.description}') + step1.status = StepStatus.COMPLETED + step1.result = 'User table created with id, email, password_hash fields' + memory.update_plan(current_plan) + + # Execute step 2 + next_steps = current_plan.get_next_steps() + print(f'\n🎯 Next steps ready: {len(next_steps)}') + step2 = next_steps[0] + print(f'⏳ Executing: {step2.description}') + step2.status = StepStatus.COMPLETED + step2.result = 'POST /register endpoint with validation implemented' + memory.update_plan(current_plan) + + show_plan_status() + + print(f'\nπŸ“ˆ Plan completion status: {current_plan.is_completed()}') + print(f'πŸ“Š Remaining steps: {len(current_plan.get_next_steps())}') + + +def explain_planner_loop_issue(): + """Explain why the planner got stuck in a loop and how to fix it""" + print('\n\nπŸ”„ Understanding the Planner Loop Issue') + print('=' * 45) + + explanation = """ +❌ THE PROBLEM: +In the original demo, the planner kept getting called in an infinite loop because: + +1. Router asks: "Is there a plan in memory?" +2. Memory says: "No ExecutionPlan objects found" +3. Router decides: "Route to planner to create plan" +4. Planner generates plan TEXT but doesn't store ExecutionPlan OBJECTS +5. Router asks again: "Is there a plan in memory?" +6. Memory still says: "No ExecutionPlan objects found" +7. INFINITE LOOP! πŸ”„ + +βœ… THE SOLUTION: +We need to bridge the gap between plan TEXT and plan OBJECTS: + +APPROACH 1: Specialized PlannerAgent +β€’ Custom agent that parses plan text and stores ExecutionPlan objects +β€’ Router can detect when ExecutionPlan exists in memory +β€’ Automatically switches from planning to execution phase + +APPROACH 2: Content-Based Routing +β€’ Router analyzes message content instead of relying on ExecutionPlan objects +β€’ If message contains "PLAN:", switch to execution mode +β€’ Simpler but less sophisticated + +APPROACH 3: State Management +β€’ Explicitly track workflow state (planning/executing/reviewing) +β€’ Router uses state instead of trying to detect plan completion +β€’ Most reliable but requires more setup +""" + + print(explanation) + + +def show_router_intelligence(): + """Show how the PlanExecuteRouter makes intelligent decisions""" + print('\n\n🧠 PlanExecuteRouter Intelligence') + print('=' * 35) + + intelligence_demo = """ +The PlanExecuteRouter is designed to coordinate complex workflows by: + +🎯 PHASE DETECTION: +β€’ Planning Phase: No plan exists β†’ route to planner +β€’ Execution Phase: Plan exists with pending steps β†’ route to executor +β€’ Review Phase: All steps complete β†’ route to reviewer +β€’ Error Recovery: Failed steps exist β†’ route for retry + +πŸ“Š STEP MANAGEMENT: +β€’ Analyzes step dependencies automatically +β€’ Only executes steps when dependencies are met +β€’ Tracks progress with visual indicators (β—‹ ⏳ βœ… ❌) +β€’ Handles parallel execution of independent steps + +πŸ”„ INTELLIGENT ROUTING: +β€’ Context-aware prompts with plan progress +β€’ Suggests next agent based on plan state +β€’ Prevents infinite loops with completion detection +β€’ Adapts to different workflow patterns + +πŸ’‘ EXAMPLE ROUTING DECISIONS: + +Scenario 1: No plan exists +β†’ Router: "Route to planner to create execution plan" + +Scenario 2: Plan exists, step_1 ready +β†’ Router: "Route to developer to execute step_1" + +Scenario 3: Development complete, testing needed +β†’ Router: "Route to tester to validate implementation" + +Scenario 4: All steps complete +β†’ Router: "Route to reviewer for final approval" + +Scenario 5: Step failed +β†’ Router: "Route to developer to retry failed step" +""" + + print(intelligence_demo) + + +def show_working_implementation(): + """Show the key components of a working implementation""" + print('\n\nπŸ—οΈ Working Implementation Components') + print('=' * 40) + + implementation = """ +For a working PlanExecuteRouter implementation, you need: + +1. πŸ“‹ PLAN STORAGE: + ```python + memory = PlanAwareMemory() # Stores ExecutionPlan objects + plan = ExecutionPlan(title="...", steps=[...]) + memory.add_plan(plan) + ``` + +2. πŸ€– SPECIALIZED AGENTS: + ```python + class PlannerAgent(Agent): + def run(self, input_data): + plan_text = await super().run(input_data) + execution_plan = self.parse_plan(plan_text) + self.memory.add_plan(execution_plan) # KEY! + return plan_text + ``` + +3. 🎯 SMART ROUTING: + ```python + router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer' + ) + ``` + +4. πŸ”„ WORKFLOW COORDINATION: + ```python + arium = AriumBuilder() + .with_memory(memory) # PlanAwareMemory + .add_agents([planner, developer, tester, reviewer]) + .add_edge(planner, [...], router) + .build() + ``` + +🎯 CRITICAL SUCCESS FACTORS: +β€’ PlannerAgent MUST store ExecutionPlan objects +β€’ Use PlanAwareMemory for plan state persistence +β€’ Router needs to detect plan existence reliably +β€’ Agents should update step status during execution +""" + + print(implementation) + + +async def main(): + """Main concept demo""" + print('🎯 PlanExecuteRouter Concept Demo') + print('Understanding the architecture and fixing the planner loop\n') + + # Demonstrate core concepts + demonstrate_plan_aware_memory() + explain_planner_loop_issue() + show_router_intelligence() + show_working_implementation() + + print('\n\nπŸŽ‰ Concept Demo Complete!') + print('=' * 30) + print('Key Takeaways:') + print('βœ… PlanExecuteRouter enables Cursor-style plan-and-execute workflows') + print('βœ… Planner loop occurs when plan TEXT β‰  plan OBJECTS in memory') + print('βœ… Solution: Bridge the gap with specialized agents or content parsing') + print('βœ… Working implementation requires PlanAwareMemory + ExecutionPlan objects') + + print('\nπŸš€ Next Steps:') + print('β€’ Try fixed_plan_execute_demo.py for working implementation') + print('β€’ Use PlannerAgent that stores ExecutionPlan objects') + print('β€’ Implement step status tracking for real progress monitoring') + print('β€’ Customize agents for your specific workflow needs') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/fixed_plan_execute_demo.py b/flo_ai/examples/fixed_plan_execute_demo.py new file mode 100644 index 00000000..a5976c8b --- /dev/null +++ b/flo_ai/examples/fixed_plan_execute_demo.py @@ -0,0 +1,401 @@ +""" +Fixed PlanExecuteRouter Demo - Resolves the planner loop issue + +The previous demo had an issue where the planner kept looping because the plan +wasn't being properly stored in PlanAwareMemory. This version fixes that by: + +1. Creating a special PlannerAgent that stores ExecutionPlan objects +2. Ensuring the router can detect when plans are created +3. Proper integration between plan creation and execution phases +""" + +import asyncio +import os +import uuid +import re +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.memory import PlanAwareMemory, ExecutionPlan, PlanStep, StepStatus +from flo_ai.arium.llm_router import create_plan_execute_router +from flo_ai.arium import AriumBuilder + + +class PlannerAgent(Agent): + """Special agent that creates ExecutionPlan objects and stores them in memory""" + + def __init__(self, memory: PlanAwareMemory, **kwargs): + super().__init__(**kwargs) + self.memory = memory + + async def run(self, input_data, **kwargs): + """Override run to create and store ExecutionPlan after generating plan text""" + + # First, generate the plan text using the normal agent behavior + plan_text = await super().run(input_data, **kwargs) + + # Parse the plan text and create ExecutionPlan object + execution_plan = self._parse_plan_text(plan_text) + + # Store the plan in memory + if execution_plan: + self.memory.add_plan(execution_plan) + print(f'βœ… Plan stored in memory: {execution_plan.title}') + print(f'πŸ“Š Steps: {len(execution_plan.steps)}') + + # Show the plan + for i, step in enumerate(execution_plan.steps, 1): + deps = ( + f" (depends: {', '.join(step.dependencies)})" + if step.dependencies + else '' + ) + print(f' {i}. {step.id}: {step.description} β†’ {step.agent}{deps}') + + return plan_text + + def _parse_plan_text(self, plan_text: str) -> ExecutionPlan: + """Parse LLM-generated plan text into ExecutionPlan object""" + + # Extract title + title_match = re.search(r'EXECUTION PLAN:\s*(.+)', plan_text) + title = title_match.group(1).strip() if title_match else 'Generated Plan' + + # Extract description + desc_match = re.search(r'DESCRIPTION:\s*(.+)', plan_text) + description = desc_match.group(1).strip() if desc_match else 'Execution plan' + + # Extract steps using regex + steps = [] + step_pattern = ( + r'(\d+)\.\s*(\w+):\s*(.+?)\s*β†’\s*(\w+)(?:\s*\(depends on:\s*([^)]+)\))?' + ) + + for match in re.finditer(step_pattern, plan_text, re.MULTILINE): + step_num, step_id, step_desc, agent, deps_str = match.groups() + + dependencies = [] + if deps_str: + dependencies = [dep.strip() for dep in deps_str.split(',')] + + step = PlanStep( + id=step_id, + description=step_desc.strip(), + agent=agent, + dependencies=dependencies, + status=StepStatus.PENDING, + ) + steps.append(step) + + return ExecutionPlan( + id=str(uuid.uuid4()), + title=title, + description=description, + steps=steps, + created_by=self.name, + ) + + +class ExecutorAgent(Agent): + """Special agent that marks steps as completed when executing them""" + + def __init__(self, memory: PlanAwareMemory, **kwargs): + super().__init__(**kwargs) + self.memory = memory + + async def run(self, input_data, **kwargs): + """Override run to update step status after execution""" + + # Get current plan and next steps + current_plan = self.memory.get_current_plan() + if current_plan: + next_steps = current_plan.get_next_steps() + + # Find steps assigned to this agent + my_steps = [step for step in next_steps if step.agent == self.name] + + if my_steps: + step = my_steps[0] # Execute first available step + print(f'⏳ Executing step: {step.id} - {step.description}') + + # Mark step as in progress + step.status = StepStatus.IN_PROGRESS + self.memory.update_plan(current_plan) + + # Execute the step using normal agent behavior + result = await super().run( + f'Execute this step: {step.description}. Context: {input_data}', + **kwargs, + ) + + # Mark step as completed + step.status = StepStatus.COMPLETED + step.result = result + self.memory.update_plan(current_plan) + + print(f'βœ… Completed step: {step.id}') + return result + + # If no steps to execute, just run normally + return await super().run(input_data, **kwargs) + + +async def run_fixed_demo(): + """Run the fixed plan-execute demo""" + print('πŸ”§ Fixed PlanExecuteRouter Demo') + print('=' * 40) + + # Check API key + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY environment variable not set') + print(' Set it with: export OPENAI_API_KEY=your_key_here') + return + + print('βœ… OpenAI API key found') + + # Create LLM + llm = OpenAI(model='gpt-4o', api_key=api_key) + + # Create plan-aware memory + memory = PlanAwareMemory() + + # Create special planner that stores ExecutionPlan objects + planner = PlannerAgent( + memory=memory, + name='planner', + system_prompt="""You are a project planner. Create execution plans in this EXACT format: + +EXECUTION PLAN: [Clear Title] +DESCRIPTION: [Brief description] + +STEPS: +1. step_1: [Description] β†’ developer +2. step_2: [Description] β†’ developer (depends on: step_1) +3. step_3: [Description] β†’ tester (depends on: step_2) +4. step_4: [Description] β†’ reviewer (depends on: step_3) + +Use only these agents: developer, tester, reviewer +Keep steps clear and actionable. +Always include dependencies where steps must be done in sequence.""", + llm=llm, + ) + + # Create executor agents that update step status + developer = ExecutorAgent( + memory=memory, + name='developer', + system_prompt="""You are a developer executing implementation steps. +Provide detailed implementation for the given step.""", + llm=llm, + ) + + tester = ExecutorAgent( + memory=memory, + name='tester', + system_prompt="""You are a tester validating implementations. +Create test scenarios and validate the implementation.""", + llm=llm, + ) + + reviewer = Agent( + name='reviewer', + system_prompt="""You are a reviewer providing final validation. +Review all completed work and give final approval.""", + llm=llm, + ) + + print('βœ… Created special agents with plan integration') + + # Create plan-execute router + plan_router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + additional_agents={'tester': 'Tests implementations'}, + llm=llm, + ) + + print('βœ… Created PlanExecuteRouter') + + # Create workflow + arium = ( + AriumBuilder() + .with_memory(memory) + .add_agents([planner, developer, tester, reviewer]) + .start_with(planner) + .add_edge(planner, [developer, tester, reviewer, planner], plan_router) + .add_edge(developer, [tester, reviewer, developer, planner], plan_router) + .add_edge(tester, [developer, reviewer, tester, planner], plan_router) + .add_edge(reviewer, [developer, tester, reviewer, planner], plan_router) + .end_with(reviewer) + .build() + ) + + print('βœ… Built Arium workflow') + + # Task to execute + task = 'Create a simple user login API endpoint' + + print(f'\nπŸ“‹ Task: {task}') + print('\nπŸ”„ Starting fixed plan-execute workflow...') + print(' This should now properly:') + print(' 1. Create and store execution plan') + print(' 2. Execute steps sequentially') + print(' 3. Track progress through completion') + + try: + # Execute workflow + result = await arium.run([task]) + + print('\n' + '=' * 50) + print('πŸŽ‰ WORKFLOW COMPLETED!') + print('=' * 50) + + # Show results + if result: + final_result = result[-1] if isinstance(result, list) else result + print('\nπŸ“„ Final Output:') + print('-' * 30) + print(final_result) + + # Show execution plan status + current_plan = memory.get_current_plan() + if current_plan: + print(f'\nπŸ“Š Final Plan Status: {current_plan.title}') + print(f'Completed: {current_plan.is_completed()}') + + print('\nπŸ“‹ Step Status:') + for step in current_plan.steps: + status_icon = { + StepStatus.PENDING: 'β—‹', + StepStatus.IN_PROGRESS: '⏳', + StepStatus.COMPLETED: 'βœ…', + StepStatus.FAILED: '❌', + }.get(step.status, 'β—‹') + print(f' {status_icon} {step.id}: {step.description} β†’ {step.agent}') + if step.result: + print(f' Result: {step.result[:100]}...') + + print('\nπŸ’‘ What was fixed:') + print(' β€’ PlannerAgent now creates and stores ExecutionPlan objects') + print(' β€’ ExecutorAgent updates step status during execution') + print(' β€’ Router can detect when plans exist in memory') + print(' β€’ No more infinite loops in the planner!') + + except Exception as e: + print(f'\n❌ Error: {e}') + print('Check the logs above for more details.') + + +async def run_simple_test(): + """Run a simple test to verify the fix works""" + print('\nπŸ“‹ Simple Plan Creation Test') + print('=' * 35) + + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY not set') + return + + # Create memory and planner + memory = PlanAwareMemory() + llm = OpenAI(model='gpt-4o', api_key=api_key) + + planner = PlannerAgent( + memory=memory, + name='planner', + system_prompt="""Create a plan in this format: + +EXECUTION PLAN: [Title] +DESCRIPTION: [Description] + +STEPS: +1. step_1: [Task] β†’ developer +2. step_2: [Task] β†’ tester (depends on: step_1) +3. step_3: [Task] β†’ reviewer (depends on: step_2)""", + llm=llm, + ) + + print('πŸ”„ Testing plan creation and storage...') + + try: + # Create a plan + await planner.run('Create a plan for building a simple calculator API') + + # Check if plan was stored + current_plan = memory.get_current_plan() + if current_plan: + print('βœ… Plan successfully created and stored!') + print(f'βœ… Title: {current_plan.title}') + print(f'βœ… Steps: {len(current_plan.steps)}') + + # Test next steps + next_steps = current_plan.get_next_steps() + print(f'βœ… Ready to execute: {len(next_steps)} steps') + + for step in next_steps: + print(f' β†’ {step.id}: {step.agent}') + else: + print('❌ Plan was not stored in memory') + + except Exception as e: + print(f'❌ Error: {e}') + + +def show_fix_explanation(): + """Explain what was fixed""" + print('\nπŸ”§ What Was Fixed') + print('=' * 20) + + explanation = """ +❌ Problem: Planner Loop +The original demo had the planner stuck in a loop because: +β€’ Planner generated plan text but didn't store ExecutionPlan objects +β€’ Router couldn't detect that a plan existed in memory +β€’ Router kept routing back to planner infinitely + +βœ… Solution: Special Agent Classes +Created specialized agents that integrate with PlanAwareMemory: +β€’ PlannerAgent: Parses plan text and stores ExecutionPlan objects +β€’ ExecutorAgent: Updates step status during execution +β€’ Proper integration between plan creation and execution + +🎯 Key Changes: +1. PlannerAgent.run() now creates and stores ExecutionPlan objects +2. ExecutorAgent.run() marks steps as in-progress/completed +3. Router can detect when plans exist and route accordingly +4. Proper step-by-step execution with status tracking + +πŸš€ Result: +β€’ No more infinite loops +β€’ Proper plan-execute workflow +β€’ Real-time step progress tracking +β€’ Seamless integration with PlanAwareMemory +""" + + print(explanation) + + +async def main(): + """Main demo function""" + print('🎯 Fixed PlanExecuteRouter Demo with Real OpenAI LLM') + print('This version fixes the planner loop issue!\n') + + if not os.getenv('OPENAI_API_KEY'): + print('❌ OPENAI_API_KEY not set') + print(' Set it with: export OPENAI_API_KEY=your_key_here') + return + + # Show what was fixed + show_fix_explanation() + + # Run tests + await run_simple_test() + await run_fixed_demo() + + print('\n\nπŸŽ‰ Fixed Demo Complete!') + print('The PlanExecuteRouter now works properly without infinite loops! πŸš€') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/flow_router_example.py b/flo_ai/examples/flow_router_example.py new file mode 100644 index 00000000..2da1e6c6 --- /dev/null +++ b/flo_ai/examples/flow_router_example.py @@ -0,0 +1,454 @@ +""" +Example demonstrating ReflectionRouter for A -> B -> A -> C patterns. + +This example shows how to implement a main -> critic -> main -> final reflection workflow +using the new ReflectionRouter with YAML configuration. +""" + +import asyncio +from flo_ai.arium.builder import AriumBuilder +from flo_ai.llm import OpenAI + +# Example YAML configuration for A -> B -> A -> C flow +MAIN_CRITIC_FLOW_YAML = """ +metadata: + name: main-critic-final-workflow + version: 1.0.0 + description: "A workflow demonstrating A -> B -> A -> C pattern with intelligent flow routing" + +arium: + agents: + - name: main_agent + role: Main Agent + job: > + You are the main agent responsible for analyzing tasks and creating initial solutions. + When you receive input, analyze it thoroughly and provide an initial response. + If you receive feedback from the critic, incorporate it to improve your work. + Be receptive to criticism and use it to refine your output. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.7 + + - name: critic + role: Critic Agent + job: > + You are a critic agent. Your job is to review the main agent's work and provide + constructive feedback. Analyze the output for: + - Accuracy and correctness + - Completeness and thoroughness + - Clarity and coherence + - Areas for improvement + Provide specific, actionable feedback that the main agent can use to improve. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: final_agent + role: Final Agent + job: > + You are the final agent responsible for polishing and finalizing the work. + Take the refined output from the main agent (after critic feedback) and: + - Format it professionally + - Add any final touches or improvements + - Ensure it meets high quality standards + - Provide a polished final result + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + # Reflection router configuration for A -> B -> A -> C pattern + routers: + - name: main_critic_reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + allow_early_exit: false + fallback_strategy: first + + workflow: + start: main_agent + edges: + # Single edge from main_agent using reflection router + # The router will intelligently route to: critic -> main_agent -> final_agent + - from: main_agent + to: [critic, final_agent] # All possible destinations + router: main_critic_reflection_router + - from: critic + to: [main_agent, final_agent] + router: main_critic_reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + +# Alternative stricter flow pattern +STRICT_FLOW_YAML = """ +metadata: + name: strict-main-critic-flow + version: 1.0.0 + description: "Strict A -> B -> A -> C flow with no deviations allowed" + +arium: + agents: + - name: writer + role: Content Writer + job: > + You are a content writer. Create initial content based on the user's request. + Focus on getting the core ideas down first, don't worry about perfection. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.8 + + - name: reviewer + role: Content Reviewer + job: > + You are a content reviewer. Review the writer's work and provide detailed feedback: + - What works well + - What needs improvement + - Specific suggestions for enhancement + - Areas that need clarification + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + + - name: editor + role: Content Editor + job: > + You are the final editor. Take the revised content from the writer and: + - Polish the language and style + - Ensure consistency and flow + - Make final corrections + - Prepare the content for publication + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + routers: + - name: strict_reflection_router + type: reflection + flow_pattern: [writer, reviewer, writer, editor] + settings: + allow_early_exit: false # Strict adherence to pattern + fallback_strategy: first + + workflow: + start: writer + edges: + - from: writer + to: [reviewer, editor] + router: strict_reflection_router + - from: reviewer + to: [writer, editor] + router: strict_reflection_router + - from: editor + to: [end] + end: [editor] +""" + +# Flexible flow that allows early exit +FLEXIBLE_FLOW_YAML = """ +metadata: + name: flexible-flow-with-early-exit + version: 1.0.0 + description: "Flexible A -> B -> A -> C flow that allows early completion" + +arium: + agents: + - name: analyst + role: Data Analyst + job: > + You are a data analyst. Analyze the given data or question and provide insights. + Create clear, actionable analysis based on the information provided. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + - name: validator + role: Analysis Validator + job: > + You are an analysis validator. Review the analyst's work for: + - Logical consistency + - Accuracy of conclusions + - Completeness of analysis + - Potential issues or gaps + If the analysis is solid, you can recommend proceeding directly to completion. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.1 + + - name: presenter + role: Results Presenter + job: > + You are a results presenter. Take the final analysis and create a professional + presentation of the findings with clear recommendations and next steps. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.4 + + routers: + - name: flexible_reflection_router + type: reflection + flow_pattern: [analyst, validator, analyst, presenter] + settings: + allow_early_exit: true # Allow skipping steps if appropriate + fallback_strategy: first + + workflow: + start: analyst + edges: + - from: analyst + to: [validator, presenter] + router: flexible_reflection_router + - from: validator + to: [analyst, presenter] + router: flexible_reflection_router + - from: presenter + to: [end] + end: [presenter] +""" + + +async def run_main_critic_flow_example(): + """Example 1: Main -> Critic -> Main -> Final flow""" + print('πŸš€ EXAMPLE 1: Main -> Critic -> Main -> Final Flow') + print('=' * 60) + + # Create workflow from YAML + builder = AriumBuilder.from_yaml( + yaml_str=MAIN_CRITIC_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), # Dummy key for demo + ) + + # Build the workflow + builder.build() + + print('βœ… Workflow built successfully!') + print('πŸ“‹ Reflection Pattern: main_agent β†’ critic β†’ main_agent β†’ final_agent') + print('🎯 The ReflectionRouter will:') + print(' 1. Start with main_agent for initial analysis') + print(' 2. Route to critic for feedback/reflection') + print(' 3. Return to main_agent for improvements') + print(' 4. Finally route to final_agent for polishing') + + # Test input + test_input = 'Write a comprehensive guide on sustainable urban planning' + print(f'\nπŸ“ Test Input: {test_input}') + print('πŸ’‘ This would follow the strict Aβ†’Bβ†’Aβ†’C pattern automatically!') + + +async def run_strict_flow_example(): + """Example 2: Strict flow with no deviations""" + print('\n\n🎯 EXAMPLE 2: Strict Writer -> Reviewer -> Writer -> Editor Flow') + print('=' * 70) + + builder = AriumBuilder.from_yaml( + yaml_str=STRICT_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Strict workflow built successfully!') + print('πŸ“‹ Flow Pattern: writer β†’ reviewer β†’ writer β†’ editor') + print('πŸ”’ Features:') + print(' β€’ Strict adherence to pattern (allow_early_exit: false)') + print(' β€’ LLM cannot deviate from the Aβ†’Bβ†’Aβ†’C sequence') + print(' β€’ Execution context tracks progress through flow') + + test_input = 'Create a blog post about renewable energy trends in 2024' + print(f'\nπŸ“ Test Input: {test_input}') + + +async def run_flexible_flow_example(): + """Example 3: Flexible flow with early exit option""" + print('\n\n🌟 EXAMPLE 3: Flexible Flow with Early Exit Option') + print('=' * 60) + + builder = AriumBuilder.from_yaml( + yaml_str=FLEXIBLE_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Flexible workflow built successfully!') + print('πŸ“‹ Flow Pattern: analyst β†’ validator β†’ analyst β†’ presenter') + print('πŸ”“ Features:') + print(' β€’ Flexible routing (allow_early_exit: true)') + print(' β€’ LLM can skip steps if analysis is already sufficient') + print(' β€’ Smart adaptation based on conversation context') + + test_input = 'Analyze the quarterly sales data and identify key trends' + print(f'\nπŸ“ Test Input: {test_input}') + + +def demonstrate_reflection_router_features(): + """Show the key features of ReflectionRouter""" + print('\n\nπŸ“‹ ReflectionRouter Key Features') + print('=' * 50) + + features = """ +🎯 Reflection Pattern Tracking: + β€’ Automatically tracks progress through defined reflection sequence + β€’ Uses execution context (node_visit_count) for intelligent routing + β€’ Prevents infinite loops while allowing intentional revisits + +πŸ“Š Visual Progress Display: + β€’ Shows current position in pattern: β—‹ pending, βœ“ completed + β€’ Displays suggested next step based on reflection pattern + β€’ Provides clear feedback on workflow state + +βš™οΈ Configuration Options: + β€’ allow_early_exit: Enable/disable smart reflection termination + β€’ flow_pattern: Define exact sequence (e.g., [main, critic, main, final]) + β€’ Standard LLM router settings (temperature, fallback_strategy) + +πŸ”„ Execution Context Awareness: + β€’ Tracks how many times each node has been visited + β€’ Calculates expected visits based on reflection pattern position + β€’ Intelligently determines next step in sequence + +πŸ“ YAML Configuration: + ```yaml + routers: + - name: my_reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + settings: + allow_early_exit: false + temperature: 0.2 + ``` + +πŸ›‘οΈ Safety Features: + β€’ Inherits anti-infinite-loop mechanisms from base router + β€’ Provides clear error messages for configuration issues + β€’ Graceful fallback when pattern completion detected +""" + + print(features) + + +def show_yaml_schema(): + """Show complete YAML schema for ReflectionRouter""" + print('\n\nπŸ“„ Complete ReflectionRouter YAML Schema') + print('=' * 50) + + schema = """ +# Complete example with ReflectionRouter +metadata: + name: my-flow-workflow + version: 1.0.0 + description: "A -> B -> A -> C flow pattern example" + +arium: + agents: + - name: main_agent + role: Main Agent + job: "Your main agent job description" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.7 + + - name: critic + role: Critic + job: "Your critic agent job description" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: final_agent + role: Final Agent + job: "Your final agent job description" + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: reflection_router + type: reflection # Router type + flow_pattern: [main_agent, critic, main_agent, final_agent] # A->B->A->C pattern + model: # Optional: LLM for routing decisions + provider: openai + name: gpt-4o-mini + settings: # Optional settings + temperature: 0.2 # Router temperature + allow_early_exit: false # Allow early completion + fallback_strategy: first # first, last, random + + workflow: + start: main_agent + edges: + - from: main_agent + to: [critic, final_agent] # All possible destinations + router: reflection_router # Use reflection router + - from: critic + to: [main_agent, final_agent] + router: reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + print(schema) + + +async def main(): + """Run all examples""" + print('🌟 ReflectionRouter Examples - A β†’ B β†’ A β†’ C Pattern Implementation') + print('=' * 80) + print('This example demonstrates the new ReflectionRouter for implementing') + print('structured reflection patterns with intelligent LLM-based routing! πŸŽ‰') + + # Show features and schema first + demonstrate_reflection_router_features() + show_yaml_schema() + + # Run examples + await run_main_critic_flow_example() + await run_strict_flow_example() + await run_flexible_flow_example() + + print('\n\nπŸŽ‰ All ReflectionRouter examples completed!') + print('=' * 80) + print('βœ… ReflectionRouter Benefits:') + print(' β€’ Simple YAML configuration for complex reflection patterns') + print(' β€’ Automatic progress tracking through execution context') + print(' β€’ Intelligent routing decisions based on reflection state') + print(' β€’ Flexible vs strict reflection control options') + print(' β€’ Built-in safety features and loop prevention') + print(' β€’ Easy integration with existing Arium workflows') + + print('\nπŸš€ Try it yourself:') + print(' 1. Define your agents (main, critic, final)') + print(' 2. Create a reflection router with your pattern') + print(' 3. Configure workflow edges') + print(' 4. Run your Aβ†’Bβ†’Aβ†’C reflection workflow!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/live_plan_execute_demo.py b/flo_ai/examples/live_plan_execute_demo.py new file mode 100644 index 00000000..bf56267f --- /dev/null +++ b/flo_ai/examples/live_plan_execute_demo.py @@ -0,0 +1,307 @@ +""" +Live PlanExecuteRouter Demo - Real OpenAI LLM Integration + +A focused demo showing PlanExecuteRouter with actual LLM calls. +Set OPENAI_API_KEY environment variable to run. + +This demonstrates: +1. Automatic task breakdown into execution plans +2. Step-by-step execution with progress tracking +3. Intelligent routing between planning and execution phases +""" + +import asyncio +import os +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.memory import PlanAwareMemory, ExecutionPlan, PlanStep, StepStatus +from flo_ai.arium.llm_router import create_plan_execute_router +from flo_ai.arium import AriumBuilder + + +class PlanParser: + """Helper to parse LLM-generated plans into ExecutionPlan objects""" + + @staticmethod + def parse_plan_from_text( + plan_text: str, planner_agent: str = 'planner' + ) -> ExecutionPlan: + """Parse structured plan text into ExecutionPlan object""" + import uuid + import re + + # Extract title and description + title_match = re.search(r'EXECUTION PLAN:\s*(.+)', plan_text) + title = title_match.group(1).strip() if title_match else 'Generated Plan' + + desc_match = re.search(r'DESCRIPTION:\s*(.+)', plan_text) + description = desc_match.group(1).strip() if desc_match else 'Execution plan' + + # Extract steps + steps = [] + step_pattern = ( + r'(\d+)\.\s*(\w+):\s*(.+?)\s*β†’\s*(\w+)(?:\s*\(depends on:\s*([^)]+)\))?' + ) + + for match in re.finditer(step_pattern, plan_text): + step_num, step_id, step_desc, agent, deps_str = match.groups() + + dependencies = [] + if deps_str: + dependencies = [dep.strip() for dep in deps_str.split(',')] + + step = PlanStep( + id=step_id, + description=step_desc.strip(), + agent=agent, + dependencies=dependencies, + status=StepStatus.PENDING, + ) + steps.append(step) + + return ExecutionPlan( + id=str(uuid.uuid4()), + title=title, + description=description, + steps=steps, + created_by=planner_agent, + ) + + +async def run_live_demo(): + """Run a live demo with real OpenAI API calls""" + print('πŸš€ Live PlanExecuteRouter Demo') + print('=' * 40) + + # Check API key + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY environment variable not set') + print(' Set it with: export OPENAI_API_KEY=your_key_here') + return + + print('βœ… OpenAI API key found') + + # Create LLM + llm = OpenAI(model='gpt-4o-mini', api_key=api_key) + + # Create agents with specific roles + planner = Agent( + name='planner', + system_prompt="""You are a project planner. Create execution plans in this exact format: + +EXECUTION PLAN: [Title] +DESCRIPTION: [Brief description] + +STEPS: +1. step_1: [Description] β†’ [agent] +2. step_2: [Description] β†’ [agent] (depends on: step_1) +3. step_3: [Description] β†’ [agent] (depends on: step_1, step_2) + +Use agents: developer, tester, reviewer +Keep steps clear and actionable.""", + llm=llm, + ) + + developer = Agent( + name='developer', + system_prompt="""You are a developer executing specific implementation steps. +Acknowledge the step you're working on and provide detailed implementation.""", + llm=llm, + ) + + tester = Agent( + name='tester', + system_prompt="""You are a tester validating implementations. +Create test scenarios and report validation results.""", + llm=llm, + ) + + reviewer = Agent( + name='reviewer', + system_prompt="""You are a reviewer providing final validation. +Review completed work and give final approval.""", + llm=llm, + ) + + print('βœ… Created agents: planner, developer, tester, reviewer') + + # Create memory and router + memory = PlanAwareMemory() + + plan_router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + additional_agents={'tester': 'Tests implementations'}, + llm=llm, + ) + + print('βœ… Created PlanExecuteRouter with PlanAwareMemory') + + # Create workflow + arium = ( + AriumBuilder() + .with_memory(memory) + .add_agents([planner, developer, tester, reviewer]) + .start_with(planner) + .add_edge(planner, [developer, tester, reviewer, planner], plan_router) + .add_edge(developer, [tester, reviewer, developer, planner], plan_router) + .add_edge(tester, [developer, reviewer, tester, planner], plan_router) + .add_edge(reviewer, [developer, tester, reviewer, planner], plan_router) + .end_with(reviewer) + .build() + ) + + print('βœ… Built Arium workflow') + + # Task to execute + task = 'Create a simple user registration API with email validation' + + print(f'\nπŸ“‹ Task: {task}') + print('\nπŸ”„ Executing plan-and-execute workflow...') + print( + ' Watch as the router coordinates planning β†’ development β†’ testing β†’ review' + ) + + try: + # Execute workflow + result = await arium.run([task]) + + print('\n' + '=' * 50) + print('πŸŽ‰ WORKFLOW COMPLETED!') + print('=' * 50) + + # Show results + if result: + final_result = result[-1] if isinstance(result, list) else result + print('\nπŸ“„ Final Output:') + print('-' * 30) + print(final_result) + + # Show execution plan if created + current_plan = memory.get_current_plan() + if current_plan: + print(f'\nπŸ“Š Execution Plan: {current_plan.title}') + print(f'Description: {current_plan.description}') + print(f'Steps: {len(current_plan.steps)}') + print(f'Completed: {current_plan.is_completed()}') + + print('\nπŸ“‹ Step Progress:') + for step in current_plan.steps: + status_icon = { + StepStatus.PENDING: 'β—‹', + StepStatus.IN_PROGRESS: '⏳', + StepStatus.COMPLETED: 'βœ…', + StepStatus.FAILED: '❌', + }.get(step.status, 'β—‹') + deps = ( + f" (deps: {', '.join(step.dependencies)})" + if step.dependencies + else '' + ) + print( + f' {status_icon} {step.id}: {step.description} β†’ {step.agent}{deps}' + ) + + print('\nπŸ’‘ What happened:') + print(' β€’ Router started by routing to planner') + print(' β€’ Planner created detailed execution plan') + print(' β€’ Router routed to developer for implementation steps') + print(' β€’ Router routed to tester for validation') + print(' β€’ Router routed to reviewer for final approval') + print(' β€’ Plan state tracked in PlanAwareMemory throughout') + + except Exception as e: + print(f'\n❌ Error: {e}') + print('This could be due to API rate limits or network issues.') + + +async def run_simple_plan_creation(): + """Demo just the plan creation part""" + print('\n\nπŸ“‹ Plan Creation Demo') + print('=' * 30) + + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY not set') + return + + llm = OpenAI(model='gpt-4o-mini', api_key=api_key) + + planner = Agent( + name='planner', + system_prompt="""Create an execution plan in this format: + +EXECUTION PLAN: [Title] +DESCRIPTION: [Description] + +STEPS: +1. step_1: [Task description] β†’ developer +2. step_2: [Task description] β†’ developer (depends on: step_1) +3. step_3: [Task description] β†’ tester (depends on: step_2) +4. step_4: [Task description] β†’ reviewer (depends on: step_3)""", + llm=llm, + ) + + task = 'Build a simple blog API with posts and comments' + print(f'Task: {task}') + print('\nπŸ”„ Generating execution plan...') + + try: + plan_text = await planner.run(f'Create a detailed execution plan for: {task}') + print('\nπŸ“‹ Generated Plan:') + print('-' * 30) + print(plan_text) + + # Parse into ExecutionPlan object + execution_plan = PlanParser.parse_plan_from_text(plan_text) + + print('\nβœ… Parsed into ExecutionPlan:') + print(f'Title: {execution_plan.title}') + print(f'Steps: {len(execution_plan.steps)}') + + for step in execution_plan.steps: + deps = ( + f" (depends: {', '.join(step.dependencies)})" + if step.dependencies + else '' + ) + print(f' β€’ {step.id}: {step.description} β†’ {step.agent}{deps}') + + except Exception as e: + print(f'❌ Error: {e}') + + +def show_setup_instructions(): + """Show how to set up and run the demo""" + print('πŸ“– Setup Instructions') + print('=' * 25) + print('1. Set your OpenAI API key:') + print(' export OPENAI_API_KEY=your_api_key_here') + print('\n2. Run the demo:') + print(' python examples/live_plan_execute_demo.py') + print('\n3. Watch the PlanExecuteRouter in action! πŸš€') + + +async def main(): + """Main demo function""" + print('🎯 Live PlanExecuteRouter Demo with Real OpenAI LLM') + print('This demo shows the full plan-and-execute workflow with actual API calls!\n') + + if not os.getenv('OPENAI_API_KEY'): + show_setup_instructions() + return + + # Run demos + await run_simple_plan_creation() + await run_live_demo() + + print('\n\nπŸŽ‰ Demo Complete!') + print( + 'You just saw PlanExecuteRouter break down a task and execute it step by step! πŸš€' + ) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/plan_execute_router_example.py b/flo_ai/examples/plan_execute_router_example.py new file mode 100644 index 00000000..a03132be --- /dev/null +++ b/flo_ai/examples/plan_execute_router_example.py @@ -0,0 +1,501 @@ +""" +Example demonstrating PlanExecuteRouter for Cursor-style plan-and-execute workflows. + +This example shows how to implement plan-and-execute patterns where tasks are broken down +into sequential steps and executed systematically, similar to how Cursor works. +""" + +import asyncio +from flo_ai.arium.builder import AriumBuilder +from flo_ai.llm import OpenAI + +# Example YAML configuration for Plan-Execute workflow +PLAN_EXECUTE_WORKFLOW_YAML = """ +metadata: + name: plan-execute-development-workflow + version: 1.0.0 + description: "Cursor-style plan-and-execute workflow for software development" + +arium: + agents: + - name: planner + role: Project Planner + job: > + You are a project planner who breaks down complex tasks into detailed, sequential steps. + Create comprehensive execution plans with clear dependencies and assigned agents. + Output your plan as a structured format that can be executed step by step. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: developer + role: Software Developer + job: > + You are a software developer who implements code based on specific requirements. + Execute development tasks step by step, focusing on clean, maintainable code. + Report on progress and any issues encountered during implementation. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + - name: tester + role: Quality Assurance Tester + job: > + You are a QA tester who validates implementations and ensures quality. + Test code, identify bugs, and verify that requirements are met. + Provide detailed feedback on quality and functionality. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + + - name: reviewer + role: Code Reviewer + job: > + You are a code reviewer who provides final quality assessment. + Review completed work for best practices, maintainability, and correctness. + Provide final approval or request additional improvements. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.1 + + # Plan-Execute router configuration + routers: + - name: plan_execute_router + type: plan_execute + agents: + planner: "Creates detailed execution plans by breaking down tasks into sequential steps" + developer: "Implements code and features according to plan specifications" + tester: "Tests implementations and validates functionality" + reviewer: "Reviews and validates completed work for final approval" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + planner_agent: planner + executor_agent: developer + reviewer_agent: reviewer + + workflow: + start: planner + edges: + - from: planner + to: [developer, tester, reviewer, planner] # All possible destinations + router: plan_execute_router + - from: developer + to: [tester, reviewer, developer, planner] + router: plan_execute_router + - from: tester + to: [developer, reviewer, tester, planner] + router: plan_execute_router + - from: reviewer + to: [end] + end: [reviewer] +""" + +# Simpler Plan-Execute workflow +SIMPLE_PLAN_EXECUTE_YAML = """ +metadata: + name: simple-plan-execute-workflow + version: 1.0.0 + description: "Simple plan-execute pattern for general tasks" + +arium: + agents: + - name: planner + role: Task Planner + job: > + Break down the given task into clear, actionable steps. + Create a detailed plan with dependencies and execution order. + model: + provider: openai + name: gpt-4o-mini + + - name: executor + role: Task Executor + job: > + Execute the steps from the plan one by one. + Focus on completing each step thoroughly before moving to the next. + model: + provider: openai + name: gpt-4o-mini + + - name: validator + role: Quality Validator + job: > + Validate that all steps have been completed correctly. + Ensure the final result meets the original requirements. + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: simple_plan_router + type: plan_execute + agents: + planner: "Creates execution plans" + executor: "Executes plan steps" + validator: "Validates results" + settings: + planner_agent: planner + executor_agent: executor + reviewer_agent: validator + + workflow: + start: planner + edges: + - from: planner + to: [executor, validator, planner] + router: simple_plan_router + - from: executor + to: [validator, executor, planner] + router: simple_plan_router + - from: validator + to: [end] + end: [validator] +""" + +# Research workflow with plan-execute pattern +RESEARCH_PLAN_EXECUTE_YAML = """ +metadata: + name: research-plan-execute-workflow + version: 1.0.0 + description: "Plan-execute workflow for research projects" + +arium: + agents: + - name: research_planner + role: Research Planner + job: > + Create comprehensive research plans by breaking down research questions + into specific investigation steps, data collection tasks, and analysis phases. + model: + provider: openai + name: gpt-4o-mini + + - name: researcher + role: Researcher + job: > + Conduct research according to the plan. Gather information, analyze data, + and document findings for each step of the research plan. + model: + provider: openai + name: gpt-4o-mini + + - name: analyst + role: Data Analyst + job: > + Analyze research data and findings. Identify patterns, draw conclusions, + and prepare analytical insights based on the collected information. + model: + provider: openai + name: gpt-4o-mini + + - name: synthesizer + role: Research Synthesizer + job: > + Synthesize all research findings into a comprehensive final report. + Ensure all research questions are addressed and conclusions are well-supported. + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: research_plan_router + type: plan_execute + agents: + research_planner: "Creates detailed research execution plans" + researcher: "Conducts research and gathers information" + analyst: "Analyzes data and identifies patterns" + synthesizer: "Creates final comprehensive reports" + settings: + planner_agent: research_planner + executor_agent: researcher + reviewer_agent: synthesizer + + workflow: + start: research_planner + edges: + - from: research_planner + to: [researcher, analyst, synthesizer, research_planner] + router: research_plan_router + - from: researcher + to: [analyst, synthesizer, researcher, research_planner] + router: research_plan_router + - from: analyst + to: [synthesizer, analyst, researcher, research_planner] + router: research_plan_router + - from: synthesizer + to: [end] + end: [synthesizer] +""" + + +async def run_development_workflow_example(): + """Example 1: Full development workflow with plan-execute pattern""" + print('πŸš€ EXAMPLE 1: Development Workflow with Plan-Execute Pattern') + print('=' * 65) + + # Create workflow from YAML + builder = AriumBuilder.from_yaml( + yaml_str=PLAN_EXECUTE_WORKFLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), # Dummy key for demo + ) + + # Build the workflow + builder.build() + + print('βœ… Development workflow built successfully!') + print('πŸ“‹ Plan-Execute Pattern: Planner β†’ Developer β†’ Tester β†’ Reviewer') + print('🎯 The PlanExecuteRouter will:') + print(' 1. Start with planner to create detailed execution plan') + print(' 2. Route to developer for step-by-step implementation') + print(' 3. Route to tester for quality validation') + print(' 4. Route to reviewer for final approval') + print(' 5. Track progress through each step with visual indicators') + + # Test input + test_input = 'Create a REST API for user authentication with JWT tokens' + print(f'\nπŸ“ Test Input: {test_input}') + print('πŸ’‘ Expected plan steps:') + print(' β—‹ Design API endpoints and data models') + print(' β—‹ Implement user registration endpoint') + print(' β—‹ Implement login endpoint with JWT generation') + print(' β—‹ Add authentication middleware') + print(' β—‹ Create comprehensive tests') + print(' β—‹ Review and optimize code') + + +async def run_simple_workflow_example(): + """Example 2: Simple plan-execute workflow""" + print('\n\n🎯 EXAMPLE 2: Simple Plan-Execute Workflow') + print('=' * 50) + + builder = AriumBuilder.from_yaml( + yaml_str=SIMPLE_PLAN_EXECUTE_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Simple workflow built successfully!') + print('πŸ“‹ Plan-Execute Pattern: Planner β†’ Executor β†’ Validator') + print('🎯 Features:') + print(' β€’ Automatic task breakdown by planner') + print(' β€’ Sequential step execution') + print(' β€’ Progress tracking with plan state management') + print(' β€’ Quality validation before completion') + + test_input = 'Organize a team building event for 20 people' + print(f'\nπŸ“ Test Input: {test_input}') + print('πŸ’‘ Expected workflow:') + print(' 1. Planner creates detailed event plan') + print(' 2. Executor handles each step (venue, catering, activities)') + print(' 3. Validator ensures everything is properly organized') + + +async def run_research_workflow_example(): + """Example 3: Research workflow with plan-execute pattern""" + print('\n\nπŸ”¬ EXAMPLE 3: Research Plan-Execute Workflow') + print('=' * 50) + + builder = AriumBuilder.from_yaml( + yaml_str=RESEARCH_PLAN_EXECUTE_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Research workflow built successfully!') + print( + 'πŸ“‹ Plan-Execute Pattern: Research Planner β†’ Researcher β†’ Analyst β†’ Synthesizer' + ) + print('🎯 Features:') + print(' β€’ Structured research methodology') + print(' β€’ Data collection and analysis phases') + print(' β€’ Comprehensive synthesis and reporting') + print(' β€’ Academic-quality research process') + + test_input = 'Research the impact of remote work on employee productivity' + print(f'\nπŸ“ Test Input: {test_input}') + print('πŸ’‘ Expected research plan:') + print(' β—‹ Literature review on remote work studies') + print(' β—‹ Survey design and data collection') + print(' β—‹ Statistical analysis of productivity metrics') + print(' β—‹ Qualitative analysis of employee feedback') + print(' β—‹ Final report with recommendations') + + +def demonstrate_plan_execute_features(): + """Show the key features of PlanExecuteRouter""" + print('\n\nπŸ“‹ PlanExecuteRouter Key Features') + print('=' * 45) + + features = """ +🎯 Cursor-Style Planning: + β€’ Automatic task breakdown into sequential steps + β€’ Dependency tracking between steps + β€’ Agent assignment for each step + β€’ Progress visualization with status indicators + +πŸ“Š Plan Management: + β€’ ExecutionPlan storage in enhanced memory + β€’ Step status tracking (pending, in_progress, completed, failed) + β€’ Automatic next-step determination + β€’ Failed step recovery and retry logic + +πŸ”„ Execution Flow: + β€’ Phase 1: Planning (create detailed execution plan) + β€’ Phase 2: Execution (complete steps sequentially) + β€’ Phase 3: Review (validate final results) + β€’ Automatic routing between phases + +βš™οΈ Configuration Options: + β€’ agents: Define available agents and their capabilities + β€’ planner_agent: Agent responsible for creating plans + β€’ executor_agent: Default agent for executing steps + β€’ reviewer_agent: Optional agent for final review + +πŸ›‘οΈ Safety Features: + β€’ Prevents infinite loops with step completion tracking + β€’ Handles failed steps with recovery mechanisms + β€’ Memory persistence for plan state + β€’ Execution context awareness + +πŸ“ YAML Configuration: + ```yaml + routers: + - name: plan_execute_router + type: plan_execute + agents: + planner: "Creates execution plans" + developer: "Implements features" + tester: "Validates quality" + settings: + planner_agent: planner + executor_agent: developer + reviewer_agent: tester + ``` + +πŸ”§ Memory Integration: + β€’ Uses PlanAwareMemory for plan storage + β€’ Automatic plan state persistence + β€’ Step result tracking and history + β€’ Context sharing between execution steps +""" + + print(features) + + +def show_yaml_schema(): + """Show complete YAML schema for PlanExecuteRouter""" + print('\n\nπŸ“„ Complete PlanExecuteRouter YAML Schema') + print('=' * 55) + + schema = """ +# Complete example with PlanExecuteRouter +metadata: + name: my-plan-execute-workflow + version: 1.0.0 + description: "Cursor-style plan-and-execute workflow" + +arium: + agents: + - name: planner + role: "Task Planner" + job: "Create detailed execution plans" + model: + provider: openai + name: gpt-4o-mini + + - name: executor + role: "Task Executor" + job: "Execute plan steps systematically" + model: + provider: openai + name: gpt-4o-mini + + - name: reviewer + role: "Quality Reviewer" + job: "Review and validate final results" + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: plan_execute_router + type: plan_execute # Router type + agents: # Required: Available agents + planner: "Creates detailed execution plans" + executor: "Executes individual plan steps" + reviewer: "Reviews final results" + specialist: "Handles specialized tasks" + model: # Optional: LLM for routing + provider: openai + name: gpt-4o-mini + settings: # Optional settings + temperature: 0.2 # Router temperature + planner_agent: planner # Agent for creating plans + executor_agent: executor # Default execution agent + reviewer_agent: reviewer # Optional review agent + max_retries: 3 # Max retries for failed steps + + workflow: + start: planner + edges: + - from: planner + to: [executor, reviewer, specialist, planner] # All possible destinations + router: plan_execute_router + - from: executor + to: [reviewer, specialist, executor, planner] + router: plan_execute_router + - from: reviewer + to: [end] + end: [reviewer] +""" + print(schema) + + +async def main(): + """Run all examples""" + print('🌟 PlanExecuteRouter Examples - Cursor-Style Plan-and-Execute Workflows') + print('=' * 85) + print('This example demonstrates the new PlanExecuteRouter for implementing') + print('sophisticated plan-and-execute patterns with intelligent step tracking! πŸŽ‰') + + # Show features and schema first + demonstrate_plan_execute_features() + show_yaml_schema() + + # Run examples + await run_development_workflow_example() + await run_simple_workflow_example() + await run_research_workflow_example() + + print('\n\nπŸŽ‰ All PlanExecuteRouter examples completed!') + print('=' * 85) + print('βœ… PlanExecuteRouter Benefits:') + print(' β€’ Cursor-style automatic task breakdown and execution') + print(' β€’ Intelligent step-by-step progress tracking') + print(' β€’ Visual progress indicators with plan state management') + print(' β€’ Automatic routing based on execution plan progress') + print(' β€’ Built-in error handling and step retry mechanisms') + print(' β€’ Enhanced memory system with plan persistence') + + print('\nπŸš€ Try it yourself:') + print(' 1. Define your planner, executor, and reviewer agents') + print(' 2. Create a plan-execute router with agent mappings') + print(' 3. Use PlanAwareMemory for plan state persistence') + print(' 4. Run complex tasks with automatic breakdown!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/reflection_router_example.py b/flo_ai/examples/reflection_router_example.py new file mode 100644 index 00000000..2da1e6c6 --- /dev/null +++ b/flo_ai/examples/reflection_router_example.py @@ -0,0 +1,454 @@ +""" +Example demonstrating ReflectionRouter for A -> B -> A -> C patterns. + +This example shows how to implement a main -> critic -> main -> final reflection workflow +using the new ReflectionRouter with YAML configuration. +""" + +import asyncio +from flo_ai.arium.builder import AriumBuilder +from flo_ai.llm import OpenAI + +# Example YAML configuration for A -> B -> A -> C flow +MAIN_CRITIC_FLOW_YAML = """ +metadata: + name: main-critic-final-workflow + version: 1.0.0 + description: "A workflow demonstrating A -> B -> A -> C pattern with intelligent flow routing" + +arium: + agents: + - name: main_agent + role: Main Agent + job: > + You are the main agent responsible for analyzing tasks and creating initial solutions. + When you receive input, analyze it thoroughly and provide an initial response. + If you receive feedback from the critic, incorporate it to improve your work. + Be receptive to criticism and use it to refine your output. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.7 + + - name: critic + role: Critic Agent + job: > + You are a critic agent. Your job is to review the main agent's work and provide + constructive feedback. Analyze the output for: + - Accuracy and correctness + - Completeness and thoroughness + - Clarity and coherence + - Areas for improvement + Provide specific, actionable feedback that the main agent can use to improve. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: final_agent + role: Final Agent + job: > + You are the final agent responsible for polishing and finalizing the work. + Take the refined output from the main agent (after critic feedback) and: + - Format it professionally + - Add any final touches or improvements + - Ensure it meets high quality standards + - Provide a polished final result + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + # Reflection router configuration for A -> B -> A -> C pattern + routers: + - name: main_critic_reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + allow_early_exit: false + fallback_strategy: first + + workflow: + start: main_agent + edges: + # Single edge from main_agent using reflection router + # The router will intelligently route to: critic -> main_agent -> final_agent + - from: main_agent + to: [critic, final_agent] # All possible destinations + router: main_critic_reflection_router + - from: critic + to: [main_agent, final_agent] + router: main_critic_reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + +# Alternative stricter flow pattern +STRICT_FLOW_YAML = """ +metadata: + name: strict-main-critic-flow + version: 1.0.0 + description: "Strict A -> B -> A -> C flow with no deviations allowed" + +arium: + agents: + - name: writer + role: Content Writer + job: > + You are a content writer. Create initial content based on the user's request. + Focus on getting the core ideas down first, don't worry about perfection. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.8 + + - name: reviewer + role: Content Reviewer + job: > + You are a content reviewer. Review the writer's work and provide detailed feedback: + - What works well + - What needs improvement + - Specific suggestions for enhancement + - Areas that need clarification + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + + - name: editor + role: Content Editor + job: > + You are the final editor. Take the revised content from the writer and: + - Polish the language and style + - Ensure consistency and flow + - Make final corrections + - Prepare the content for publication + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + routers: + - name: strict_reflection_router + type: reflection + flow_pattern: [writer, reviewer, writer, editor] + settings: + allow_early_exit: false # Strict adherence to pattern + fallback_strategy: first + + workflow: + start: writer + edges: + - from: writer + to: [reviewer, editor] + router: strict_reflection_router + - from: reviewer + to: [writer, editor] + router: strict_reflection_router + - from: editor + to: [end] + end: [editor] +""" + +# Flexible flow that allows early exit +FLEXIBLE_FLOW_YAML = """ +metadata: + name: flexible-flow-with-early-exit + version: 1.0.0 + description: "Flexible A -> B -> A -> C flow that allows early completion" + +arium: + agents: + - name: analyst + role: Data Analyst + job: > + You are a data analyst. Analyze the given data or question and provide insights. + Create clear, actionable analysis based on the information provided. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.5 + + - name: validator + role: Analysis Validator + job: > + You are an analysis validator. Review the analyst's work for: + - Logical consistency + - Accuracy of conclusions + - Completeness of analysis + - Potential issues or gaps + If the analysis is solid, you can recommend proceeding directly to completion. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.1 + + - name: presenter + role: Results Presenter + job: > + You are a results presenter. Take the final analysis and create a professional + presentation of the findings with clear recommendations and next steps. + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.4 + + routers: + - name: flexible_reflection_router + type: reflection + flow_pattern: [analyst, validator, analyst, presenter] + settings: + allow_early_exit: true # Allow skipping steps if appropriate + fallback_strategy: first + + workflow: + start: analyst + edges: + - from: analyst + to: [validator, presenter] + router: flexible_reflection_router + - from: validator + to: [analyst, presenter] + router: flexible_reflection_router + - from: presenter + to: [end] + end: [presenter] +""" + + +async def run_main_critic_flow_example(): + """Example 1: Main -> Critic -> Main -> Final flow""" + print('πŸš€ EXAMPLE 1: Main -> Critic -> Main -> Final Flow') + print('=' * 60) + + # Create workflow from YAML + builder = AriumBuilder.from_yaml( + yaml_str=MAIN_CRITIC_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), # Dummy key for demo + ) + + # Build the workflow + builder.build() + + print('βœ… Workflow built successfully!') + print('πŸ“‹ Reflection Pattern: main_agent β†’ critic β†’ main_agent β†’ final_agent') + print('🎯 The ReflectionRouter will:') + print(' 1. Start with main_agent for initial analysis') + print(' 2. Route to critic for feedback/reflection') + print(' 3. Return to main_agent for improvements') + print(' 4. Finally route to final_agent for polishing') + + # Test input + test_input = 'Write a comprehensive guide on sustainable urban planning' + print(f'\nπŸ“ Test Input: {test_input}') + print('πŸ’‘ This would follow the strict Aβ†’Bβ†’Aβ†’C pattern automatically!') + + +async def run_strict_flow_example(): + """Example 2: Strict flow with no deviations""" + print('\n\n🎯 EXAMPLE 2: Strict Writer -> Reviewer -> Writer -> Editor Flow') + print('=' * 70) + + builder = AriumBuilder.from_yaml( + yaml_str=STRICT_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Strict workflow built successfully!') + print('πŸ“‹ Flow Pattern: writer β†’ reviewer β†’ writer β†’ editor') + print('πŸ”’ Features:') + print(' β€’ Strict adherence to pattern (allow_early_exit: false)') + print(' β€’ LLM cannot deviate from the Aβ†’Bβ†’Aβ†’C sequence') + print(' β€’ Execution context tracks progress through flow') + + test_input = 'Create a blog post about renewable energy trends in 2024' + print(f'\nπŸ“ Test Input: {test_input}') + + +async def run_flexible_flow_example(): + """Example 3: Flexible flow with early exit option""" + print('\n\n🌟 EXAMPLE 3: Flexible Flow with Early Exit Option') + print('=' * 60) + + builder = AriumBuilder.from_yaml( + yaml_str=FLEXIBLE_FLOW_YAML, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + + print('βœ… Flexible workflow built successfully!') + print('πŸ“‹ Flow Pattern: analyst β†’ validator β†’ analyst β†’ presenter') + print('πŸ”“ Features:') + print(' β€’ Flexible routing (allow_early_exit: true)') + print(' β€’ LLM can skip steps if analysis is already sufficient') + print(' β€’ Smart adaptation based on conversation context') + + test_input = 'Analyze the quarterly sales data and identify key trends' + print(f'\nπŸ“ Test Input: {test_input}') + + +def demonstrate_reflection_router_features(): + """Show the key features of ReflectionRouter""" + print('\n\nπŸ“‹ ReflectionRouter Key Features') + print('=' * 50) + + features = """ +🎯 Reflection Pattern Tracking: + β€’ Automatically tracks progress through defined reflection sequence + β€’ Uses execution context (node_visit_count) for intelligent routing + β€’ Prevents infinite loops while allowing intentional revisits + +πŸ“Š Visual Progress Display: + β€’ Shows current position in pattern: β—‹ pending, βœ“ completed + β€’ Displays suggested next step based on reflection pattern + β€’ Provides clear feedback on workflow state + +βš™οΈ Configuration Options: + β€’ allow_early_exit: Enable/disable smart reflection termination + β€’ flow_pattern: Define exact sequence (e.g., [main, critic, main, final]) + β€’ Standard LLM router settings (temperature, fallback_strategy) + +πŸ”„ Execution Context Awareness: + β€’ Tracks how many times each node has been visited + β€’ Calculates expected visits based on reflection pattern position + β€’ Intelligently determines next step in sequence + +πŸ“ YAML Configuration: + ```yaml + routers: + - name: my_reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + settings: + allow_early_exit: false + temperature: 0.2 + ``` + +πŸ›‘οΈ Safety Features: + β€’ Inherits anti-infinite-loop mechanisms from base router + β€’ Provides clear error messages for configuration issues + β€’ Graceful fallback when pattern completion detected +""" + + print(features) + + +def show_yaml_schema(): + """Show complete YAML schema for ReflectionRouter""" + print('\n\nπŸ“„ Complete ReflectionRouter YAML Schema') + print('=' * 50) + + schema = """ +# Complete example with ReflectionRouter +metadata: + name: my-flow-workflow + version: 1.0.0 + description: "A -> B -> A -> C flow pattern example" + +arium: + agents: + - name: main_agent + role: Main Agent + job: "Your main agent job description" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.7 + + - name: critic + role: Critic + job: "Your critic agent job description" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.3 + + - name: final_agent + role: Final Agent + job: "Your final agent job description" + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: reflection_router + type: reflection # Router type + flow_pattern: [main_agent, critic, main_agent, final_agent] # A->B->A->C pattern + model: # Optional: LLM for routing decisions + provider: openai + name: gpt-4o-mini + settings: # Optional settings + temperature: 0.2 # Router temperature + allow_early_exit: false # Allow early completion + fallback_strategy: first # first, last, random + + workflow: + start: main_agent + edges: + - from: main_agent + to: [critic, final_agent] # All possible destinations + router: reflection_router # Use reflection router + - from: critic + to: [main_agent, final_agent] + router: reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + print(schema) + + +async def main(): + """Run all examples""" + print('🌟 ReflectionRouter Examples - A β†’ B β†’ A β†’ C Pattern Implementation') + print('=' * 80) + print('This example demonstrates the new ReflectionRouter for implementing') + print('structured reflection patterns with intelligent LLM-based routing! πŸŽ‰') + + # Show features and schema first + demonstrate_reflection_router_features() + show_yaml_schema() + + # Run examples + await run_main_critic_flow_example() + await run_strict_flow_example() + await run_flexible_flow_example() + + print('\n\nπŸŽ‰ All ReflectionRouter examples completed!') + print('=' * 80) + print('βœ… ReflectionRouter Benefits:') + print(' β€’ Simple YAML configuration for complex reflection patterns') + print(' β€’ Automatic progress tracking through execution context') + print(' β€’ Intelligent routing decisions based on reflection state') + print(' β€’ Flexible vs strict reflection control options') + print(' β€’ Built-in safety features and loop prevention') + print(' β€’ Easy integration with existing Arium workflows') + + print('\nπŸš€ Try it yourself:') + print(' 1. Define your agents (main, critic, final)') + print(' 2. Create a reflection router with your pattern') + print(' 3. Configure workflow edges') + print(' 4. Run your Aβ†’Bβ†’Aβ†’C reflection workflow!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/simple_flow_router_demo.py b/flo_ai/examples/simple_flow_router_demo.py new file mode 100644 index 00000000..8b917b60 --- /dev/null +++ b/flo_ai/examples/simple_flow_router_demo.py @@ -0,0 +1,181 @@ +""" +Simple demonstration of ReflectionRouter for A -> B -> A -> C patterns. + +This shows the minimal code needed to implement a main -> critic -> main -> final +reflection pattern using the new ReflectionRouter. +""" + +import asyncio +from flo_ai.arium.builder import AriumBuilder +from flo_ai.arium.memory import MessageMemory +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.llm_router import create_main_critic_reflection_router + + +async def simple_reflection_demo(): + """Minimal example of A -> B -> A -> C reflection pattern""" + print('πŸš€ Simple ReflectionRouter Demo: A β†’ B β†’ A β†’ C Pattern') + print('=' * 60) + + # Create LLM (use dummy key for demo) + llm = OpenAI(model='gpt-4o-mini', api_key='dummy-key') + + # Create agents + main_agent = Agent( + name='main_agent', + system_prompt='You are the main agent. Analyze tasks and create solutions.', + llm=llm, + ) + + critic = Agent( + name='critic', + system_prompt='You are a critic. Provide constructive feedback to improve work.', + llm=llm, + ) + + final_agent = Agent( + name='final_agent', + system_prompt='You are the final agent. Polish and finalize the work.', + llm=llm, + ) + + # Create reflection router for A -> B -> A -> C pattern + reflection_router = create_main_critic_reflection_router( + main_agent='main_agent', + critic_agent='critic', + final_agent='final_agent', + allow_early_exit=False, # Strict reflection + llm=llm, + ) + + # Build workflow + builder = ( + AriumBuilder() + .with_memory(MessageMemory()) + .add_agents([main_agent, critic, final_agent]) + .start_with(main_agent) + .add_edge(main_agent, [critic, final_agent], reflection_router) + .add_edge(critic, [main_agent, final_agent], reflection_router) + .end_with(final_agent) + ) + + # Build the Arium + arium = builder.build() + + print('βœ… Workflow created successfully!') + print('πŸ“‹ Reflection Pattern: main_agent β†’ critic β†’ main_agent β†’ final_agent') + print('🎯 Router will automatically follow this sequence') + + # Demo input + print('\nπŸ“ Example input: "Create a project plan for a mobile app"') + print('πŸ’‘ The reflection router will:') + print(' Step 1: Route to critic (for feedback/reflection)') + print(' Step 2: Return to main_agent (to incorporate feedback)') + print(' Step 3: Route to final_agent (for final polish)') + + return arium + + +async def programmatic_reflection_example(): + """Show how to create reflection router programmatically""" + print('\n\nπŸ”§ Programmatic ReflectionRouter Creation') + print('=' * 50) + + from flo_ai.arium.llm_router import create_llm_router + + # Method 1: Using the convenience function + create_main_critic_reflection_router( + main_agent='writer', critic_agent='reviewer', final_agent='editor' + ) + print('βœ… Method 1: Convenience function create_main_critic_reflection_router()') + + # Method 2: Using the factory function directly + create_llm_router( + 'reflection', + flow_pattern=['analyst', 'validator', 'analyst', 'presenter'], + allow_early_exit=True, + ) + print('βœ… Method 2: Factory function create_llm_router(type="reflection")') + + # Method 3: Creating ReflectionRouter directly + from flo_ai.arium.llm_router import ReflectionRouter + + ReflectionRouter( + flow_pattern=['main', 'critic', 'main', 'final'], allow_early_exit=False + ) + print('βœ… Method 3: Direct ReflectionRouter instantiation') + + print('\n🎯 All methods create the same Aβ†’Bβ†’Aβ†’C reflection pattern!') + + +def show_minimal_yaml(): + """Show the minimal YAML needed""" + print('\n\nπŸ“„ Minimal YAML Configuration') + print('=' * 35) + + yaml_example = """ +# Minimal ReflectionRouter YAML +arium: + agents: + - name: main_agent + job: "Main agent job" + model: {provider: openai, name: gpt-4o-mini} + - name: critic + job: "Critic job" + model: {provider: openai, name: gpt-4o-mini} + - name: final_agent + job: "Final agent job" + model: {provider: openai, name: gpt-4o-mini} + + routers: + - name: reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + + workflow: + start: main_agent + edges: + - from: main_agent + to: [critic, final_agent] + router: reflection_router + - from: critic + to: [main_agent, final_agent] + router: reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + print(yaml_example) + + +async def main(): + """Run the simple demo""" + print('🌟 Simple ReflectionRouter Demo') + print('=' * 35) + print('Demonstrating A β†’ B β†’ A β†’ C reflection pattern with minimal setup\n') + + # Show different creation methods + await programmatic_reflection_example() + + # Show minimal YAML + show_minimal_yaml() + + # Create simple reflection workflow + arium = await simple_reflection_demo() + + print('\n\nπŸŽ‰ Demo completed!') + print('Key takeaways:') + print('βœ… ReflectionRouter makes Aβ†’Bβ†’Aβ†’C patterns trivial to implement') + print('βœ… Works with both YAML and programmatic configuration') + print('βœ… Automatically tracks progress and prevents infinite loops') + print('βœ… Provides intelligent routing based on execution context') + + result = await arium.run( + inputs=['Write a comprehensive guide on sustainable urban planning'] + ) + print(result) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/simple_plan_execute_demo.py b/flo_ai/examples/simple_plan_execute_demo.py new file mode 100644 index 00000000..0b4e5607 --- /dev/null +++ b/flo_ai/examples/simple_plan_execute_demo.py @@ -0,0 +1,418 @@ +""" +Simple demonstration of PlanExecuteRouter with actual plan creation and execution. + +This demo shows how the PlanExecuteRouter works with PlanAwareMemory to create, +store, and execute plans step by step, similar to how Cursor works. +""" + +import asyncio +import uuid +from flo_ai.arium.builder import AriumBuilder +from flo_ai.arium.memory import PlanAwareMemory, ExecutionPlan, PlanStep, StepStatus +from flo_ai.llm import OpenAI +from flo_ai.models.agent import Agent +from flo_ai.arium.llm_router import create_plan_execute_router + + +async def demo_plan_aware_memory(): + """Demonstrate PlanAwareMemory functionality""" + print('πŸ“‹ DEMO: PlanAwareMemory - Plan Storage and Management') + print('=' * 55) + + # Create plan-aware memory + memory = PlanAwareMemory() + + # Create a sample execution plan + plan = ExecutionPlan( + id=str(uuid.uuid4()), + title='Build User Authentication System', + description='Create a complete user authentication system with login, registration, and JWT tokens', + steps=[ + PlanStep( + id='step_1', + description='Design database schema for users', + agent='developer', + status=StepStatus.PENDING, + ), + PlanStep( + id='step_2', + description='Implement user registration endpoint', + agent='developer', + dependencies=['step_1'], + status=StepStatus.PENDING, + ), + PlanStep( + id='step_3', + description='Implement login endpoint with JWT generation', + agent='developer', + dependencies=['step_1', 'step_2'], + status=StepStatus.PENDING, + ), + PlanStep( + id='step_4', + description='Add authentication middleware', + agent='developer', + dependencies=['step_3'], + status=StepStatus.PENDING, + ), + PlanStep( + id='step_5', + description='Write comprehensive tests', + agent='tester', + dependencies=['step_4'], + status=StepStatus.PENDING, + ), + PlanStep( + id='step_6', + description='Final code review and optimization', + agent='reviewer', + dependencies=['step_5'], + status=StepStatus.PENDING, + ), + ], + ) + + # Add plan to memory + memory.add_plan(plan) + print(f'βœ… Plan added to memory: {plan.title}') + print(f'πŸ“Š Total steps: {len(plan.steps)}') + + # Show plan progress + def show_progress(): + current_plan = memory.get_current_plan() + if current_plan: + print(f'\nπŸ“‹ Plan Progress: {current_plan.title}') + for step in current_plan.steps: + status_icon = { + StepStatus.PENDING: 'β—‹', + StepStatus.IN_PROGRESS: '⏳', + StepStatus.COMPLETED: 'βœ…', + StepStatus.FAILED: '❌', + }.get(step.status, 'β—‹') + deps = ( + f" (depends on: {', '.join(step.dependencies)})" + if step.dependencies + else '' + ) + print( + f' {status_icon} {step.id}: {step.description} β†’ {step.agent}{deps}' + ) + + show_progress() + + # Simulate step execution + print('\nπŸ”„ Simulating step execution...') + + # Execute step 1 + current_plan = memory.get_current_plan() + next_steps = current_plan.get_next_steps() + if next_steps: + step = next_steps[0] + print(f'\n⏳ Executing: {step.description}') + step.status = StepStatus.IN_PROGRESS + memory.update_plan(current_plan) + + # Simulate completion + step.status = StepStatus.COMPLETED + step.result = ( + 'User table created with id, email, password_hash, created_at fields' + ) + memory.update_plan(current_plan) + print(f'βœ… Completed: {step.description}') + + # Execute step 2 + current_plan = memory.get_current_plan() + next_steps = current_plan.get_next_steps() + if next_steps: + step = next_steps[0] + print(f'\n⏳ Executing: {step.description}') + step.status = StepStatus.COMPLETED + step.result = 'POST /api/register endpoint implemented with validation' + memory.update_plan(current_plan) + print(f'βœ… Completed: {step.description}') + + show_progress() + + # Check what's next + current_plan = memory.get_current_plan() + next_steps = current_plan.get_next_steps() + print(f'\n🎯 Next steps ready for execution: {len(next_steps)}') + for step in next_steps: + print(f' β†’ {step.id}: {step.description} (agent: {step.agent})') + + print(f'\nπŸ“ˆ Plan completion: {current_plan.is_completed()}') + + +async def demo_programmatic_plan_execute(): + """Demonstrate programmatic usage of PlanExecuteRouter""" + print('\n\nπŸ—οΈ DEMO: Programmatic PlanExecuteRouter Usage') + print('=' * 55) + + # Create LLM with dummy key for demo + llm = OpenAI(model='gpt-4o-mini', api_key='dummy-key') + + # Create agents + Agent( + name='planner', + system_prompt="""You are an expert project planner. When given a task, create a detailed execution plan. + +When asked to create a plan, respond with a structured format like this: + +EXECUTION PLAN: [Title] +DESCRIPTION: [Brief description] + +STEPS: +1. step_id: [description] β†’ [agent_name] +2. step_id: [description] β†’ [agent_name] (depends on: step1) +3. step_id: [description] β†’ [agent_name] (depends on: step1, step2) + +Always include clear dependencies and assign appropriate agents.""", + llm=llm, + ) + + Agent( + name='developer', + system_prompt='You are a software developer. Execute development tasks step by step.', + llm=llm, + ) + + Agent( + name='tester', + system_prompt='You are a QA tester. Test implementations and validate functionality.', + llm=llm, + ) + + Agent( + name='reviewer', + system_prompt='You are a code reviewer. Review completed work and provide final validation.', + llm=llm, + ) + + # Create plan-execute router + plan_router = create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + additional_agents={ + 'tester': 'Tests implementations and validates functionality' + }, + llm=llm, + ) + + # Create plan-aware memory + memory = PlanAwareMemory() + + print('βœ… Created agents and PlanExecuteRouter') + print('🎯 Router will coordinate: planner β†’ developer β†’ tester β†’ reviewer') + print('πŸ’Ύ Using PlanAwareMemory for plan state management') + + # Simulate routing decisions + print('\n🧠 Simulating router decision making...') + + # Add a message to trigger planning + memory.add({'role': 'user', 'content': 'Create a TODO app with React and Node.js'}) + + # Test routing with no plan (should route to planner) + try: + next_agent = plan_router(memory) + print(f'πŸ“ Router decision (no plan): {next_agent}') + print(' Expected: planner (to create execution plan)') + + except Exception as e: + print(f'⚠️ Router simulation note: {e}') + print(' (This is expected in demo mode without real LLM calls)') + + print('\nπŸ’‘ In a real scenario:') + print(' 1. Router would route to planner to create execution plan') + print(' 2. Planner creates detailed plan and stores in memory') + print(' 3. Router routes to developer for first development step') + print(' 4. Developer completes step and updates plan status') + print(' 5. Router routes to next step based on plan state') + print(' 6. Process continues until all steps complete') + print(' 7. Router routes to reviewer for final validation') + + +async def demo_yaml_plan_execute(): + """Demonstrate YAML configuration for PlanExecuteRouter""" + print('\n\nπŸ“„ DEMO: YAML Configuration for PlanExecuteRouter') + print('=' * 55) + + yaml_config = """ +metadata: + name: simple-plan-execute-demo + version: 1.0.0 + description: "Demo of plan-execute pattern" + +arium: + agents: + - name: planner + role: Task Planner + job: > + Break down complex tasks into detailed, sequential execution plans. + Create clear steps with dependencies and agent assignments. + model: + provider: openai + name: gpt-4o-mini + + - name: executor + role: Task Executor + job: > + Execute plan steps systematically, one by one. + Report progress and update plan status. + model: + provider: openai + name: gpt-4o-mini + + - name: validator + role: Quality Validator + job: > + Validate completed work and ensure quality standards. + Provide final approval for plan completion. + model: + provider: openai + name: gpt-4o-mini + + routers: + - name: demo_plan_router + type: plan_execute + agents: + planner: "Creates detailed execution plans" + executor: "Executes plan steps systematically" + validator: "Validates final results" + model: + provider: openai + name: gpt-4o-mini + settings: + temperature: 0.2 + planner_agent: planner + executor_agent: executor + reviewer_agent: validator + + workflow: + start: planner + edges: + - from: planner + to: [executor, validator, planner] + router: demo_plan_router + - from: executor + to: [validator, executor, planner] + router: demo_plan_router + - from: validator + to: [end] + end: [validator] +""" + + try: + # Build workflow from YAML + builder = AriumBuilder.from_yaml( + yaml_str=yaml_config, + base_llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + + builder.build() + print('βœ… YAML workflow built successfully!') + print('πŸ“‹ Configured plan-execute pattern with 3 agents') + print('πŸ”„ Router will coordinate planning β†’ execution β†’ validation') + + # Show the workflow structure + print('\nπŸ“Š Workflow Structure:') + print(' Start: planner') + print(' Edges:') + print(' planner β†’ [executor, validator, planner] (plan_execute_router)') + print(' executor β†’ [validator, executor, planner] (plan_execute_router)') + print(' validator β†’ [end]') + print(' End: validator') + + except Exception as e: + print(f'ℹ️ YAML demo note: {e}') + + print('\n🎯 Key YAML Features:') + print(' β€’ type: plan_execute - Enables plan-execute routing') + print(' β€’ agents: Dict mapping agent names to descriptions') + print(' β€’ planner_agent: Agent responsible for creating plans') + print(' β€’ executor_agent: Default agent for executing steps') + print(' β€’ reviewer_agent: Optional agent for final review') + + +def show_memory_integration(): + """Show how PlanExecuteRouter integrates with memory""" + print('\n\nπŸ’Ύ DEMO: Memory Integration with PlanExecuteRouter') + print('=' * 55) + + integration_info = """ +πŸ”„ PlanExecuteRouter Memory Integration: + +1. Plan Creation Phase: + β€’ Router detects no plan in memory + β€’ Routes to planner agent + β€’ Planner creates ExecutionPlan with steps + β€’ Plan stored in PlanAwareMemory + +2. Execution Phase: + β€’ Router checks current plan state + β€’ Identifies next ready steps (dependencies met) + β€’ Routes to appropriate agent for step execution + β€’ Agent updates step status and results + +3. Progress Tracking: + β€’ Plan progress visualized with status indicators: + β—‹ Pending ⏳ In Progress βœ… Completed ❌ Failed + β€’ Dependencies automatically managed + β€’ Failed steps trigger recovery routing + +4. Memory Persistence: + β€’ Plan state persists across agent interactions + β€’ Step results and metadata stored + β€’ Execution context maintained + +5. Completion Handling: + β€’ Router detects when all steps complete + β€’ Routes to reviewer agent (if configured) + β€’ Final validation and workflow completion + +πŸ“Š Memory Structure: +```python +memory = PlanAwareMemory() +memory.add_plan(execution_plan) # Store plan +current_plan = memory.get_current_plan() # Retrieve active plan +next_steps = current_plan.get_next_steps() # Get ready steps +``` + +🎯 Router Intelligence: +β€’ Automatically routes based on plan state +β€’ Handles step dependencies and execution order +β€’ Provides context-aware prompts with progress +β€’ Manages error recovery and retry logic +""" + + print(integration_info) + + +async def main(): + """Run all plan-execute demos""" + print('πŸš€ PlanExecuteRouter Simple Demo') + print('=' * 40) + print('This demo shows the PlanExecuteRouter in action with actual plan') + print('creation, storage, and step-by-step execution tracking! πŸŽ‰\n') + + # Run demos + await demo_plan_aware_memory() + await demo_programmatic_plan_execute() + await demo_yaml_plan_execute() + show_memory_integration() + + print('\n\nπŸŽ‰ PlanExecuteRouter Demo Complete!') + print('=' * 45) + print('βœ… What we demonstrated:') + print(' β€’ PlanAwareMemory for plan storage and state tracking') + print(' β€’ ExecutionPlan with steps, dependencies, and status') + print(' β€’ Programmatic router creation and usage') + print(' β€’ YAML configuration for plan-execute workflows') + print(' β€’ Memory integration and plan state management') + + print('\nπŸš€ Ready to build your own Cursor-style workflows!') + print(' Try the PlanExecuteRouter for complex task automation! 🎯') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/simple_reflection_router_demo.py b/flo_ai/examples/simple_reflection_router_demo.py new file mode 100644 index 00000000..7c4c2e40 --- /dev/null +++ b/flo_ai/examples/simple_reflection_router_demo.py @@ -0,0 +1,188 @@ +""" +Simple demonstration of ReflectionRouter for A -> B -> A -> C patterns. + +This shows the minimal code needed to implement a main -> critic -> main -> final +reflection pattern using the new ReflectionRouter. +""" + +import asyncio +from flo_ai.arium.builder import AriumBuilder +from flo_ai.arium.memory import MessageMemory +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.llm_router import create_main_critic_reflection_router + + +async def simple_reflection_demo(): + """Minimal example of A -> B -> A -> C reflection pattern""" + print('πŸš€ Simple ReflectionRouter Demo: A β†’ B β†’ A β†’ C Pattern') + print('=' * 60) + + # Create LLM (use dummy key for demo) + llm = OpenAI(model='gpt-4o-mini') + + # Create agents + main_agent = Agent( + name='main_agent', + system_prompt='You are the main agent. Analyze tasks and create solutions.', + llm=llm, + ) + + critic = Agent( + name='critic', + system_prompt='You are a critic. Provide constructive feedback to improve work.', + llm=llm, + ) + + final_agent = Agent( + name='final_agent', + system_prompt='You are the final agent. Polish and finalize the work.', + llm=llm, + ) + + # Create reflection router for A -> B -> A -> C pattern + reflection_router = create_main_critic_reflection_router( + main_agent='main_agent', + critic_agent='critic', + final_agent='final_agent', + allow_early_exit=False, # Strict reflection + llm=llm, + ) + + # Build workflow + builder = ( + AriumBuilder() + .with_memory(MessageMemory()) + .add_agents([main_agent, critic, final_agent]) + .start_with(main_agent) + .add_edge(main_agent, [critic, final_agent], reflection_router) + .add_edge(critic, [main_agent, final_agent], reflection_router) + .end_with(final_agent) + ) + + # Build the Arium + arium = builder.build() + + print('βœ… Workflow created successfully!') + print('πŸ“‹ Reflection Pattern: main_agent β†’ critic β†’ main_agent β†’ final_agent') + print('🎯 Router will automatically follow this sequence') + + # Demo input + print('\nπŸ“ Example input: "Create a project plan for a mobile app"') + print('πŸ’‘ The reflection router will:') + print(' Step 1: Route to critic (for feedback/reflection)') + print(' Step 2: Return to main_agent (to incorporate feedback)') + print(' Step 3: Route to final_agent (for final polish)') + + return arium + + +async def programmatic_reflection_example(): + """Show how to create reflection router programmatically""" + print('\n\nπŸ”§ Programmatic ReflectionRouter Creation') + print('=' * 50) + + from flo_ai.arium.llm_router import create_llm_router + + # Method 1: Using the convenience function + create_main_critic_reflection_router( + main_agent='writer', + critic_agent='reviewer', + final_agent='editor', + llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + print('βœ… Method 1: Convenience function create_main_critic_reflection_router()') + + # Method 2: Using the factory function directly + create_llm_router( + 'reflection', + flow_pattern=['analyst', 'validator', 'analyst', 'presenter'], + allow_early_exit=True, + llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + print('βœ… Method 2: Factory function create_llm_router(type="reflection")') + + # Method 3: Creating ReflectionRouter directly + from flo_ai.arium.llm_router import ReflectionRouter + + ReflectionRouter( + flow_pattern=['main', 'critic', 'main', 'final'], + allow_early_exit=False, + llm=OpenAI(model='gpt-4o-mini', api_key='dummy-key'), + ) + print('βœ… Method 3: Direct ReflectionRouter instantiation') + + print('\n🎯 All methods create the same Aβ†’Bβ†’Aβ†’C reflection pattern!') + + +def show_minimal_yaml(): + """Show the minimal YAML needed""" + print('\n\nπŸ“„ Minimal YAML Configuration') + print('=' * 35) + + yaml_example = """ +# Minimal ReflectionRouter YAML +arium: + agents: + - name: main_agent + job: "Main agent job" + model: {provider: openai, name: gpt-4o-mini} + - name: critic + job: "Critic job" + model: {provider: openai, name: gpt-4o-mini} + - name: final_agent + job: "Final agent job" + model: {provider: openai, name: gpt-4o-mini} + + routers: + - name: reflection_router + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + + workflow: + start: main_agent + edges: + - from: main_agent + to: [critic, final_agent] + router: reflection_router + - from: critic + to: [main_agent, final_agent] + router: reflection_router + - from: final_agent + to: [end] + end: [final_agent] +""" + print(yaml_example) + + +async def main(): + """Run the simple demo""" + print('🌟 Simple ReflectionRouter Demo') + print('=' * 35) + print('Demonstrating A β†’ B β†’ A β†’ C reflection pattern with minimal setup\n') + + # Show different creation methods + await programmatic_reflection_example() + + # Show minimal YAML + show_minimal_yaml() + + # Create simple reflection workflow + arium = await simple_reflection_demo() + + result = await arium.run( + inputs=['Write a comprehensive guide on sustainable urban planning'] + ) + + print('\n\nπŸŽ‰ Demo completed!') + print('Key takeaways:') + print('βœ… ReflectionRouter makes Aβ†’Bβ†’Aβ†’C patterns trivial to implement') + print('βœ… Works with both YAML and programmatic configuration') + print('βœ… Automatically tracks progress and prevents infinite loops') + print('βœ… Provides intelligent routing based on execution context') + + print(result) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/examples/simple_working_demo.py b/flo_ai/examples/simple_working_demo.py new file mode 100644 index 00000000..fd939d7f --- /dev/null +++ b/flo_ai/examples/simple_working_demo.py @@ -0,0 +1,335 @@ +""" +Simple Working PlanExecuteRouter Demo + +This version fixes the planner loop by using a different approach: +1. Use a standard workflow without plan storage complexity +2. Demonstrate the routing intelligence with manual plan simulation +3. Show how the router makes decisions based on context +""" + +import asyncio +import os +from flo_ai.models.agent import Agent +from flo_ai.llm import OpenAI +from flo_ai.arium.memory import MessageMemory +from flo_ai.arium.llm_router import create_plan_execute_router +from flo_ai.arium import AriumBuilder + + +async def simple_working_demo(): + """Simple working demo that avoids the planner loop""" + print('βœ… Simple Working PlanExecuteRouter Demo') + print('=' * 45) + + # Check API key + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY environment variable not set') + print(' Set it with: export OPENAI_API_KEY=your_key_here') + return + + print('βœ… OpenAI API key found') + + # Create LLM + llm = OpenAI(model='gpt-4o', api_key=api_key) + + # Create simple agents focused on their core tasks + planner = Agent( + name='planner', + system_prompt="""You are a project planner. When given a task, create a detailed plan with numbered steps. + +Format your response like this: + +PLAN FOR: [task name] + +EXECUTION STEPS: +1. [First step description] +2. [Second step description] +3. [Third step description] +4. [Final step description] + +NEXT ACTION: The developer should start with step 1. + +Keep it clear and actionable.""", + llm=llm, + ) + + developer = Agent( + name='developer', + system_prompt="""You are a software developer. When given a development task: + +1. Acknowledge what you're implementing +2. Provide implementation details +3. Mention any important considerations +4. State when you've completed the task + +Be specific and thorough in your implementation.""", + llm=llm, + ) + + tester = Agent( + name='tester', + system_prompt="""You are a QA tester. When given something to test: + +1. Acknowledge what you're testing +2. Create test scenarios +3. Identify potential issues +4. Provide test results and recommendations + +Be thorough in your testing approach.""", + llm=llm, + ) + + reviewer = Agent( + name='reviewer', + system_prompt="""You are a senior reviewer. When reviewing work: + +1. Assess overall quality and completeness +2. Check if requirements are met +3. Provide constructive feedback +4. Give final approval or suggest improvements + +Focus on delivering high-quality results.""", + llm=llm, + ) + + print('βœ… Created focused agents: planner, developer, tester, reviewer') + + # Create a simple router using the proper factory function + from typing import Literal + from flo_ai.arium.memory import BaseMemory + + def create_simple_router(): + """Create a simple router with proper type annotations""" + + def router_impl( + memory: BaseMemory, + ) -> Literal['developer', 'tester', 'reviewer']: + """Simple routing logic for demo purposes""" + messages = memory.get() + + # Check the conversation flow + if not messages: + return 'developer' # Start with developer after planner + + last_message = str(messages[-1]) + + # Basic routing logic based on content + if 'PLAN FOR:' in last_message and 'EXECUTION STEPS:' in last_message: + print('πŸ“‹ Plan detected - routing to developer') + return 'developer' + elif ( + 'implemented' in last_message.lower() + or 'development' in last_message.lower() + ): + print('πŸ’» Development complete - routing to tester') + return 'tester' + elif 'test' in last_message.lower() and 'complete' in last_message.lower(): + print('πŸ§ͺ Testing complete - routing to reviewer') + return 'reviewer' + elif len(messages) > 6: # Prevent too many iterations + print('🏁 Workflow complete - ending') + return 'reviewer' + else: + return 'developer' # Default fallback + + # Add required annotations + router_impl.__annotations__ = { + 'memory': BaseMemory, + 'return': Literal['developer', 'tester', 'reviewer'], + } + + return router_impl + + simple_router = create_simple_router() + + # Create memory + memory = MessageMemory() + + # Build workflow with simple routing + # Note: Each edge's 'to' nodes must include all possible router return values + arium = ( + AriumBuilder() + .with_memory(memory) + .add_agents([planner, developer, tester, reviewer]) + .start_with(planner) + .add_edge(planner, [developer, tester, reviewer], simple_router) + .add_edge( + developer, [developer, tester, reviewer], simple_router + ) # Include all possible destinations + .add_edge( + tester, [developer, tester, reviewer], simple_router + ) # Include all possible destinations + .end_with(reviewer) + .build() + ) + + print('βœ… Built simple workflow with basic routing') + + # Task to execute + task = 'Create a simple login endpoint with username and password validation' + + print(f'\nπŸ“‹ Task: {task}') + print('\nπŸ”„ Running simple workflow...') + print(' This demonstrates the routing concept without complex plan storage') + + try: + # Execute workflow + result = await arium.run([task]) + + print('\n' + '=' * 50) + print('πŸŽ‰ SIMPLE WORKFLOW COMPLETED!') + print('=' * 50) + + # Show the conversation flow + if memory.get(): + print('\nπŸ“„ Conversation Flow:') + print('-' * 30) + for i, msg in enumerate(memory.get(), 1): + role = msg.get('role', 'unknown') + content = str(msg.get('content', ''))[:200] + print(f'{i}. {role.upper()}: {content}...') + + # Show final result + if result: + final_result = result[-1] if isinstance(result, list) else result + print('\nπŸ“„ Final Output:') + print('-' * 30) + print(final_result) + + print('\nπŸ’‘ What this demonstrated:') + print(' β€’ Basic plan-execute workflow concept') + print(' β€’ Intelligent routing between phases') + print(' β€’ Planner β†’ Developer β†’ Tester β†’ Reviewer flow') + print(' β€’ How to avoid infinite loops with simple logic') + + except Exception as e: + print(f'\n❌ Error: {e}') + + +async def demonstrate_plan_execute_router(): + """Show the actual PlanExecuteRouter in a controlled way""" + print('\n\nπŸ“Š PlanExecuteRouter Demonstration') + print('=' * 40) + + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print('❌ OPENAI_API_KEY not set') + return + + llm = OpenAI(model='gpt-4o', api_key=api_key) + + # Create the actual PlanExecuteRouter + create_plan_execute_router( + planner_agent='planner', + executor_agent='developer', + reviewer_agent='reviewer', + additional_agents={'tester': 'Tests implementations'}, + llm=llm, + ) + + print('βœ… Created actual PlanExecuteRouter') + + # Test router decision making with mock memory + memory = MessageMemory() + + # Simulate different scenarios + scenarios = [ + {'msg': 'Create a user API', 'context': 'Initial request'}, + {'msg': 'Plan created with 4 steps', 'context': 'After planning'}, + {'msg': 'Step 1 implemented successfully', 'context': 'After development'}, + {'msg': 'All tests passed', 'context': 'After testing'}, + ] + + print('\n🧠 Router Decision Making:') + for scenario in scenarios: + memory.add({'role': 'user', 'content': scenario['msg']}) + + try: + # This would make an actual LLM call to decide routing + print(f'\n Context: {scenario["context"]}') + print(f' Message: {scenario["msg"]}') + print(' Router would make intelligent decision based on this context') + # decision = plan_router(memory) # Uncomment to see actual routing + # print(f' Decision: Route to {decision}') + except Exception as e: + print(f' Note: {e}') + + print('\nπŸ’‘ The PlanExecuteRouter makes intelligent routing decisions by:') + print(' β€’ Analyzing conversation context') + print(' β€’ Detecting plan creation vs execution phases') + print(' β€’ Understanding step dependencies and progress') + print(' β€’ Routing to appropriate agents based on workflow state') + + +def show_solution_approaches(): + """Show different approaches to fix the planner loop""" + print('\n\nπŸ”§ Solutions to the Planner Loop Issue') + print('=' * 45) + + solutions = """ +The planner loop happened because the router couldn't detect plan completion. +Here are several solutions: + +1. πŸ“‹ SPECIALIZED PLANNER AGENT (Best Solution) + β€’ PlannerAgent that stores ExecutionPlan objects in PlanAwareMemory + β€’ Router detects when plans exist and switches to execution mode + β€’ See fixed_plan_execute_demo.py for implementation + +2. 🎯 CONTENT-BASED ROUTING (Simple Solution) + β€’ Router analyzes message content to detect phases + β€’ If message contains "PLAN:", route to developer + β€’ If message contains "implemented:", route to tester + β€’ See simple_working_demo.py example above + +3. πŸ”„ LIMITED ITERATIONS (Quick Fix) + β€’ Add max iteration limits to prevent infinite loops + β€’ Router switches phases after X iterations + β€’ Less intelligent but prevents loops + +4. πŸ“Š STATE MANAGEMENT (Advanced Solution) + β€’ Use custom memory with explicit state tracking + β€’ Store workflow phase (planning/executing/reviewing) + β€’ Router uses state to make decisions + +5. 🧠 BETTER PROMPTING (Prompt Engineering) + β€’ Improve router prompts to better detect completion + β€’ Add explicit "PLANNING COMPLETE" markers + β€’ Train router to recognize different phases + +Recommendation: Use approach #1 (Specialized Planner Agent) for production, +approach #2 (Content-Based Routing) for quick demos. +""" + + print(solutions) + + +async def main(): + """Main demo function""" + print('🎯 Working PlanExecuteRouter Demo (Loop Issue Fixed)') + print('This demo shows how to avoid the planner loop issue!\n') + + if not os.getenv('OPENAI_API_KEY'): + print('❌ To run this demo, set your OPENAI_API_KEY environment variable') + print(' export OPENAI_API_KEY=your_key_here') + return + + # Show solution approaches + show_solution_approaches() + + # Run simple working demo + await simple_working_demo() + + # Demonstrate the actual router + await demonstrate_plan_execute_router() + + print('\n\nπŸŽ‰ Demo Complete!') + print('=' * 20) + print('βœ… Demonstrated working plan-execute workflow') + print('βœ… Showed how to avoid planner loops') + print('βœ… Explained multiple solution approaches') + print('\nπŸš€ Try the fixed_plan_execute_demo.py for the complete solution!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/flo_ai/flo_ai/arium/base.py b/flo_ai/flo_ai/arium/base.py index 26943295..c3458354 100644 --- a/flo_ai/flo_ai/arium/base.py +++ b/flo_ai/flo_ai/arium/base.py @@ -103,16 +103,38 @@ def add_edge( if literal_values is None: raise ValueError('Router return type is not a Literal') - invalid_literals = [val for val in literal_values if val not in to_nodes] + # Check if router supports self-reference + supports_self_ref = getattr(router, 'supports_self_reference', False) + + # For self-referencing routers, we need to include the from_node in valid targets + valid_targets = to_nodes.copy() + if supports_self_ref: + valid_targets.append(from_node) + + invalid_literals = [ + val for val in literal_values if val not in valid_targets + ] if invalid_literals: raise ValueError( - f'Router return type includes literal values {invalid_literals} that are not in to_nodes {to_nodes}' + f'Router return type includes literal values {invalid_literals} that are not in valid targets {valid_targets}' ) - if set(literal_values) != set(to_nodes): - raise ValueError( - f'Router return type values {literal_values} do not match to_nodes {to_nodes}' - ) + # For self-referencing routers, allow router options to include from_node + if supports_self_ref: + # Router can return any of: to_nodes + from_node, but must include all to_nodes + missing_targets = [ + node for node in to_nodes if node not in literal_values + ] + if missing_targets: + raise ValueError( + f'Self-referencing router must include all to_nodes {to_nodes}, missing: {missing_targets}' + ) + else: + # Non-self-referencing routers must match exactly + if set(literal_values) != set(to_nodes): + raise ValueError( + f'Router return type values {literal_values} do not match to_nodes {to_nodes}' + ) self.edges[from_node] = Edge( router_fn=router diff --git a/flo_ai/flo_ai/arium/builder.py b/flo_ai/flo_ai/arium/builder.py index fc0821fa..633d59d6 100644 --- a/flo_ai/flo_ai/arium/builder.py +++ b/flo_ai/flo_ai/arium/builder.py @@ -230,7 +230,7 @@ def from_yaml( # LLM Router definitions (NEW) routers: - name: content_router - type: smart # smart, task_classifier, conversation_analysis + type: smart # smart, task_classifier, conversation_analysis, reflection, plan_execute routing_options: technical_writer: "Handle technical documentation tasks" creative_writer: "Handle creative writing tasks" @@ -242,6 +242,26 @@ def from_yaml( temperature: 0.3 fallback_strategy: first + # Reflection router for A -> B -> A -> C patterns + - name: main_critic_reflection + type: reflection + flow_pattern: [main_agent, critic, main_agent, final_agent] + settings: + allow_early_exit: false + + # Plan-Execute router for Cursor-style workflows + - name: plan_execute_router + type: plan_execute + agents: + planner: "Creates detailed execution plans" + developer: "Implements code and features" + tester: "Tests implementations" + reviewer: "Reviews final results" + settings: + planner_agent: planner + executor_agent: developer + reviewer_agent: reviewer + workflow: start: content_analyst edges: @@ -418,9 +438,37 @@ def from_yaml( llm=router_llm, **settings, ) + + elif router_type == 'reflection': + flow_pattern = router_config.get('flow_pattern', []) + if not flow_pattern: + raise ValueError( + f'Reflection router {router_name} must specify flow_pattern' + ) + + router_fn = create_llm_router( + router_type='reflection', + flow_pattern=flow_pattern, + llm=router_llm, + **settings, + ) + + elif router_type == 'plan_execute': + agents = router_config.get('agents', {}) + if not agents: + raise ValueError( + f'Plan-Execute router {router_name} must specify agents' + ) + + router_fn = create_llm_router( + router_type='plan_execute', + agents=agents, + llm=router_llm, + **settings, + ) else: raise ValueError( - f'Unknown router type: {router_type}. Supported types: smart, task_classifier, conversation_analysis' + f'Unknown router type: {router_type}. Supported types: smart, task_classifier, conversation_analysis, reflection, plan_execute' ) yaml_routers[router_name] = router_fn diff --git a/flo_ai/flo_ai/arium/llm_router.py b/flo_ai/flo_ai/arium/llm_router.py index 900d2cf8..17f98d6c 100644 --- a/flo_ai/flo_ai/arium/llm_router.py +++ b/flo_ai/flo_ai/arium/llm_router.py @@ -6,9 +6,9 @@ """ from abc import ABC, abstractmethod -from typing import Dict, Optional, Callable, Any, Union, get_args +from typing import Dict, Optional, Callable, Any, Union, get_args, List from functools import wraps -from flo_ai.arium.memory import BaseMemory +from flo_ai.arium.memory import BaseMemory, ExecutionPlan, StepStatus from flo_ai.llm.base_llm import BaseLLM from flo_ai.llm import OpenAI from flo_ai.utils.logger import logger @@ -40,6 +40,9 @@ def __init__( self.temperature = temperature self.max_retries = max_retries self.fallback_strategy = fallback_strategy + self.supports_self_reference = ( + False # Most routers don't support self-reference by default + ) @abstractmethod def get_routing_options(self) -> Dict[str, str]: @@ -324,6 +327,381 @@ def get_routing_prompt( return prompt +class ReflectionRouter(BaseLLMRouter): + """ + A router designed for reflection patterns like A -> B -> A -> C. + Commonly used for main -> critic -> main -> final workflows where B is a reflection/critique step. + Uses execution context to determine flow state and make intelligent routing decisions. + """ + + def __init__( + self, + flow_pattern: List[str], + llm: Optional[BaseLLM] = None, + allow_early_exit: bool = False, + **kwargs, + ): + """ + Initialize the reflection router. + + Args: + flow_pattern: List of node names defining the reflection pattern (e.g., ["main", "critic", "main", "final"]) + llm: LLM instance for routing decisions + allow_early_exit: Whether to allow LLM to exit pattern early if appropriate + **kwargs: Additional arguments for BaseLLMRouter + """ + super().__init__(llm=llm, **kwargs) + self.flow_pattern = flow_pattern + self.allow_early_exit = allow_early_exit + self.supports_self_reference = ( + True # ReflectionRouter can return to the same node + ) + + def get_routing_options(self) -> Dict[str, str]: + """Get available routing options based on flow pattern""" + unique_nodes = list( + dict.fromkeys(self.flow_pattern) + ) # Preserve order, remove duplicates + + # Generate descriptions based on reflection pattern + options = {} + for node in unique_nodes: + # Count occurrences and positions in pattern + positions = [i for i, x in enumerate(self.flow_pattern) if x == node] + if len(positions) > 1: + options[node] = ( + f"Step {positions} in the reflection pattern: {' -> '.join(self.flow_pattern)}" + ) + else: + options[node] = ( + f"Step {positions[0] + 1} in the reflection pattern: {' -> '.join(self.flow_pattern)}" + ) + + return options + + def _get_next_step_in_pattern(self, execution_context: dict) -> Optional[str]: + """Determine the next step in the reflection pattern based on execution context""" + if not execution_context: + return self.flow_pattern[0] if self.flow_pattern else None + + visit_counts = execution_context.get('node_visit_count', {}) + current_node = execution_context.get('current_node', '') + + # Find the current position in the pattern + try: + # Find where we are in the reflection pattern + current_step = -1 + for i, node in enumerate(self.flow_pattern): + node_visits = visit_counts.get(node, 0) + + # For nodes that appear multiple times, we need to track which occurrence + if node == current_node: + # Count how many times this node should have been visited at this step + expected_visits = len( + [x for x in self.flow_pattern[: i + 1] if x == node] + ) + if node_visits >= expected_visits: + current_step = i + + # Determine next step + next_step_index = current_step + 1 + if next_step_index < len(self.flow_pattern): + return self.flow_pattern[next_step_index] + else: + # Pattern completed + return None + + except Exception: + # Fallback to first step + return self.flow_pattern[0] if self.flow_pattern else None + + def get_routing_prompt( + self, + memory: BaseMemory, + options: Dict[str, str], + execution_context: dict = None, + ) -> str: + conversation = memory.get() + + # Format conversation history + if isinstance(conversation, list): + conversation_text = '\n'.join( + [str(msg) for msg in conversation[-3:]] + ) # Last 3 messages for flow context + else: + conversation_text = str(conversation) + + # Determine suggested next step based on reflection pattern + suggested_next = self._get_next_step_in_pattern(execution_context) + + # Format options + options_text = '\n'.join( + [f'- {name}: {desc}' for name, desc in options.items()] + ) + + # Add execution context info + context_info = '' + if execution_context: + visit_counts = execution_context.get('node_visit_count', {}) + current_node = execution_context.get('current_node', 'unknown') + iteration = execution_context.get('iteration_count', 0) + + # Show reflection pattern progress + pattern_progress = [] + for i, node in enumerate(self.flow_pattern): + visits = visit_counts.get(node, 0) + expected_visits = len( + [x for x in self.flow_pattern[: i + 1] if x == node] + ) + status = 'βœ“' if visits >= expected_visits else 'β—‹' + pattern_progress.append(f'{status} {node}') + + context_info = f""" +πŸ“‹ REFLECTION PATTERN: {' β†’ '.join(self.flow_pattern)} +πŸ“ CURRENT PROGRESS: {' β†’ '.join(pattern_progress)} +🎯 SUGGESTED NEXT: {suggested_next or 'Pattern Complete'} +πŸ’‘ CURRENT NODE: {current_node} (iteration {iteration}) +""" + + # Create prompt based on whether early exit is allowed + if self.allow_early_exit: + prompt = f"""You are a reflection coordinator managing this workflow pattern: {' β†’ '.join(self.flow_pattern)} + +{context_info} +Available options: +{options_text} + +Recent conversation: +{conversation_text} + +Instructions: +1. Follow the reflection pattern: {' β†’ '.join(self.flow_pattern)} +2. The suggested next step is: {suggested_next or 'Pattern Complete'} +3. You may exit early if the reflection cycle is complete +4. Consider conversation context and reflection progress +5. Respond with ONLY the agent name (no explanations) + +Next agent:""" + else: + prompt = f"""You are a reflection coordinator managing this strict reflection pattern: {' β†’ '.join(self.flow_pattern)} + +{context_info} +Available options: +{options_text} + +Recent conversation: +{conversation_text} + +Instructions: +1. STRICTLY follow the reflection pattern: {' β†’ '.join(self.flow_pattern)} +2. The next step should be: {suggested_next or 'Pattern Complete'} +3. Do not deviate from the pattern unless absolutely necessary +4. Respond with ONLY the agent name (no explanations) + +Next agent:""" + + return prompt + + +class PlanExecuteRouter(BaseLLMRouter): + """ + A router that implements plan-and-execute patterns like Cursor. + Creates execution plans and routes through steps sequentially. + """ + + def __init__( + self, + agents: Dict[str, str], # agent_name -> description mapping + planner_agent: str = 'planner', + executor_agent: str = 'executor', + reviewer_agent: Optional[str] = None, + llm: Optional[BaseLLM] = None, + max_retries: int = 3, + **kwargs, + ): + """ + Initialize the plan-execute router. + + Args: + agents: Dict mapping agent names to their descriptions/capabilities + planner_agent: Name of the agent responsible for creating plans + executor_agent: Name of the agent responsible for executing steps + reviewer_agent: Optional name of the agent responsible for reviewing results + llm: LLM instance for routing decisions + max_retries: Maximum retries for step execution + **kwargs: Additional arguments for BaseLLMRouter + """ + super().__init__(llm=llm, **kwargs) + self.agents = agents + self.planner_agent = planner_agent + self.executor_agent = executor_agent + self.reviewer_agent = reviewer_agent + self.max_retries = max_retries + self.supports_self_reference = ( + True # Can route to same agent for iterative execution + ) + + def get_routing_options(self) -> Dict[str, str]: + """Get available routing options based on configured agents""" + return self.agents + + def get_routing_prompt( + self, + memory: BaseMemory, + options: Dict[str, str], + execution_context: dict = None, + ) -> str: + conversation = memory.get() + + # Format conversation history + if isinstance(conversation, list): + conversation_text = '\n'.join( + [str(msg) for msg in conversation[-3:]] + ) # Last 3 messages for context + else: + conversation_text = str(conversation) + + # Check if we have a plan in memory + current_plan = ( + memory.get_current_plan() if hasattr(memory, 'get_current_plan') else None + ) + + if current_plan is None: + # No plan exists - route to planner + return self._create_planning_prompt(conversation_text, options) + else: + # Plan exists - determine next action based on plan state + return self._create_execution_prompt( + current_plan, conversation_text, options, execution_context + ) + + def _create_planning_prompt( + self, conversation_text: str, options: Dict[str, str] + ) -> str: + """Create prompt for initial planning phase""" + options_text = '\n'.join( + [f'- {name}: {desc}' for name, desc in options.items()] + ) + + prompt = f"""You are coordinating a plan-and-execute workflow. No execution plan exists yet. + +Available agents: +{options_text} + +Recent conversation: +{conversation_text} + +TASK: Create an execution plan by routing to the {self.planner_agent}. + +Instructions: +1. Route to "{self.planner_agent}" to create a detailed execution plan +2. The planner will break down the task into sequential steps +3. Each step will specify which agent should execute it +4. Respond with ONLY the agent name: {self.planner_agent} + +Next agent:""" + + return prompt + + def _create_execution_prompt( + self, + plan: ExecutionPlan, + conversation_text: str, + options: Dict[str, str], + execution_context: dict = None, + ) -> str: + """Create prompt for execution phase based on current plan state""" + + # Get next steps that are ready to execute + next_steps = plan.get_next_steps() + + # Format plan progress + progress_lines = [] + for step in plan.steps: + status_icon = { + StepStatus.PENDING: 'β—‹', + StepStatus.IN_PROGRESS: '⏳', + StepStatus.COMPLETED: 'βœ…', + StepStatus.FAILED: '❌', + StepStatus.SKIPPED: '⏭️', + }.get(step.status, 'β—‹') + progress_lines.append( + f'{status_icon} {step.id}: {step.description} (β†’ {step.agent})' + ) + + progress_text = '\n'.join(progress_lines) + + # Determine what to do next + if plan.is_completed(): + # All steps completed + if self.reviewer_agent and self.reviewer_agent in options: + action = f'Route to {self.reviewer_agent} for final review' + suggested_agent = self.reviewer_agent + else: + action = 'Plan completed - route to any agent for final output' + suggested_agent = next(iter(options.keys())) # First available agent + elif plan.has_failed_steps(): + # Some steps failed - need recovery + failed_steps = [ + step for step in plan.steps if step.status == StepStatus.FAILED + ] + failed_step = failed_steps[0] # Focus on first failed step + action = f"Handle failed step '{failed_step.id}' - route to {failed_step.agent} for retry" + suggested_agent = failed_step.agent + elif next_steps: + # There are steps ready to execute + next_step = next_steps[0] # Execute first ready step + action = f"Execute step '{next_step.id}' - route to {next_step.agent}" + suggested_agent = next_step.agent + else: + # Waiting for dependencies + action = f'Waiting for dependencies - route to {self.executor_agent} for status check' + suggested_agent = self.executor_agent + + options_text = '\n'.join( + [f'- {name}: {desc}' for name, desc in options.items()] + ) + + # Add execution context info + context_info = '' + if execution_context: + current_node = execution_context.get('current_node', 'unknown') + iteration = execution_context.get('iteration_count', 0) + + context_info = f""" +πŸ’‘ EXECUTION CONTEXT: +Current node: {current_node} (iteration {iteration}) +""" + + prompt = f"""You are coordinating plan execution in a plan-and-execute workflow. + +πŸ“‹ EXECUTION PLAN: {plan.title} +{plan.description} + +πŸ“Š CURRENT PROGRESS: +{progress_text} + +🎯 NEXT ACTION: {action} +🎯 SUGGESTED AGENT: {suggested_agent} +{context_info} +Available agents: +{options_text} + +Recent conversation: +{conversation_text} + +Instructions: +1. Follow the execution plan step by step +2. Route to the suggested agent: {suggested_agent} +3. Each agent will execute their assigned step +4. Continue until all steps are completed +5. Respond with ONLY the agent name (no explanations) + +Next agent:""" + + return prompt + + class ConversationAnalysisRouter(BaseLLMRouter): """ A router that analyzes conversation flow and context to make routing decisions. @@ -424,7 +802,7 @@ def create_llm_router(router_type: str, **config) -> Callable[[BaseMemory], str] Factory function to create LLM-powered routers with different configurations. Args: - router_type: Type of router ("smart", "task_classifier", "conversation_analysis") + router_type: Type of router ("smart", "task_classifier", "conversation_analysis", "reflection", "plan_execute") **config: Configuration specific to the router type Returns: @@ -457,6 +835,26 @@ def create_llm_router(router_type: str, **config) -> Callable[[BaseMemory], str] } } ) + + # Reflection router for A -> B -> A -> C patterns + router = create_llm_router( + "reflection", + flow_pattern=["main_agent", "critic", "main_agent", "final_agent"], + allow_early_exit=False + ) + + # Plan-Execute router for Cursor-style workflows + router = create_llm_router( + "plan_execute", + agents={ + "planner": "Creates detailed execution plans", + "developer": "Implements code and features", + "tester": "Tests implementations", + "reviewer": "Reviews final results" + }, + planner_agent="planner", + executor_agent="developer" + ) """ if router_type == 'smart': @@ -481,6 +879,18 @@ def create_llm_router(router_type: str, **config) -> Callable[[BaseMemory], str] router_instance = ConversationAnalysisRouter(**config) + elif router_type == 'reflection': + if 'flow_pattern' not in config: + raise ValueError("ReflectionRouter requires 'flow_pattern' parameter") + + router_instance = ReflectionRouter(**config) + + elif router_type == 'plan_execute': + if 'agents' not in config: + raise ValueError("PlanExecuteRouter requires 'agents' parameter") + + router_instance = PlanExecuteRouter(**config) + else: raise ValueError(f'Unknown router type: {router_type}') @@ -506,6 +916,11 @@ async def router_function(memory: BaseMemory, execution_context: dict = None): # Add proper type annotations for validation router_function.__annotations__ = {'memory': BaseMemory, 'return': literal_type} + # Transfer router instance attributes to the function for validation + router_function.supports_self_reference = getattr( + router_instance, 'supports_self_reference', False + ) + return router_function @@ -658,3 +1073,93 @@ def create_research_analysis_router( }, llm=llm, ) + + +def create_main_critic_reflection_router( + main_agent: str = 'main_agent', + critic_agent: str = 'critic', + final_agent: str = 'final_agent', + allow_early_exit: bool = False, + llm: Optional[BaseLLM] = None, +) -> Callable[[BaseMemory], str]: + """ + Create a router for the A -> B -> A -> C reflection pattern (main -> critic -> main -> final). + + Args: + main_agent: Name of the main agent (appears twice in pattern) + critic_agent: Name of the critic agent for reflection + final_agent: Name of the final agent + allow_early_exit: Whether to allow LLM to exit reflection early if appropriate + llm: LLM instance for routing decisions + + Returns: + Router function for main/critic/final reflection workflows + """ + return create_llm_router( + 'reflection', + flow_pattern=[main_agent, critic_agent, main_agent, final_agent], + allow_early_exit=allow_early_exit, + llm=llm, + ) + + +def create_plan_execute_router( + planner_agent: str = 'planner', + executor_agent: str = 'executor', + reviewer_agent: Optional[str] = None, + additional_agents: Optional[Dict[str, str]] = None, + llm: Optional[BaseLLM] = None, +) -> Callable[[BaseMemory], str]: + """ + Create a router for plan-and-execute workflows like Cursor. + + Args: + planner_agent: Name of the agent responsible for creating plans + executor_agent: Name of the agent responsible for executing steps + reviewer_agent: Optional name of the agent responsible for reviewing results + additional_agents: Additional agents that can be used in execution steps + llm: LLM instance for routing decisions + + Returns: + Router function for plan-execute workflows + """ + agents = { + planner_agent: 'Creates detailed execution plans by breaking down tasks into sequential steps', + executor_agent: 'Executes individual steps from the execution plan', + } + + if reviewer_agent: + agents[reviewer_agent] = 'Reviews and validates completed work' + + if additional_agents: + agents.update(additional_agents) + + return create_llm_router( + 'plan_execute', + agents=agents, + planner_agent=planner_agent, + executor_agent=executor_agent, + reviewer_agent=reviewer_agent, + llm=llm, + ) + + +# Backward compatibility alias +def create_main_critic_flow_router( + main_agent: str = 'main_agent', + critic_agent: str = 'critic', + final_agent: str = 'final_agent', + allow_early_exit: bool = False, + llm: Optional[BaseLLM] = None, +) -> Callable[[BaseMemory], str]: + """ + DEPRECATED: Use create_main_critic_reflection_router instead. + Create a router for the A -> B -> A -> C reflection pattern (main -> critic -> main -> final). + """ + return create_main_critic_reflection_router( + main_agent=main_agent, + critic_agent=critic_agent, + final_agent=final_agent, + allow_early_exit=allow_early_exit, + llm=llm, + ) diff --git a/flo_ai/flo_ai/arium/memory.py b/flo_ai/flo_ai/arium/memory.py index 88e9f113..783ca4ce 100644 --- a/flo_ai/flo_ai/arium/memory.py +++ b/flo_ai/flo_ai/arium/memory.py @@ -1,18 +1,117 @@ from abc import ABC, abstractmethod -from typing import TypeVar, Generic, List, Dict +from typing import TypeVar, Generic, List, Dict, Optional, Any +from dataclasses import dataclass, field +from enum import Enum # Define the generic type variable T = TypeVar('T') +class StepStatus(Enum): + """Status of a plan step""" + + PENDING = 'pending' + IN_PROGRESS = 'in_progress' + COMPLETED = 'completed' + FAILED = 'failed' + SKIPPED = 'skipped' + + +@dataclass +class PlanStep: + """Represents a single step in an execution plan""" + + id: str + description: str + agent: str # Which agent should execute this step + dependencies: List[str] = field(default_factory=list) # Step IDs this depends on + status: StepStatus = StepStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ExecutionPlan: + """Represents a complete execution plan""" + + id: str + title: str + description: str + steps: List[PlanStep] = field(default_factory=list) + created_by: str = 'planner' + status: str = 'active' # active, completed, failed, paused + metadata: Dict[str, Any] = field(default_factory=dict) + + def get_next_steps(self) -> List[PlanStep]: + """Get steps that are ready to execute (pending with no pending dependencies)""" + next_steps = [] + for step in self.steps: + if step.status == StepStatus.PENDING: + # Check if all dependencies are completed + if all( + self.get_step(dep_id).status == StepStatus.COMPLETED + for dep_id in step.dependencies + ): + next_steps.append(step) + return next_steps + + def get_step(self, step_id: str) -> Optional[PlanStep]: + """Get a step by ID""" + for step in self.steps: + if step.id == step_id: + return step + return None + + def mark_step_completed(self, step_id: str, result: str = None): + """Mark a step as completed""" + step = self.get_step(step_id) + if step: + step.status = StepStatus.COMPLETED + step.result = result + + def mark_step_failed(self, step_id: str, error: str = None): + """Mark a step as failed""" + step = self.get_step(step_id) + if step: + step.status = StepStatus.FAILED + step.error = error + + def is_completed(self) -> bool: + """Check if all steps are completed""" + return all(step.status == StepStatus.COMPLETED for step in self.steps) + + def has_failed_steps(self) -> bool: + """Check if any steps have failed""" + return any(step.status == StepStatus.FAILED for step in self.steps) + + class BaseMemory(ABC, Generic[T]): @abstractmethod def add(self, m: T): pass + @abstractmethod def get(self) -> List[T]: pass + # Plan management methods (optional - only implemented by memory classes that support plans) + def add_plan(self, plan: ExecutionPlan): + """Add an execution plan (override in subclasses that support plans)""" + raise NotImplementedError('This memory type does not support plans') + + def get_current_plan(self) -> Optional[ExecutionPlan]: + """Get the current active plan (override in subclasses that support plans)""" + return None + + def update_plan(self, plan: ExecutionPlan): + """Update an existing plan (override in subclasses that support plans)""" + raise NotImplementedError('This memory type does not support plans') + + def get_plan(self, plan_id: str) -> Optional[ExecutionPlan]: + """Get a plan by ID (override in subclasses that support plans)""" + return None + class MessageMemory(BaseMemory[Dict[str, str]]): def __init__(self): @@ -23,3 +122,61 @@ def add(self, message: Dict[str, str]): def get(self) -> List[Dict[str, str]]: return self.messages + + +class PlanAwareMemory(BaseMemory[Dict[str, str]]): + """Enhanced memory that supports both messages and execution plans""" + + def __init__(self): + self.messages = [] + self.plans: Dict[str, ExecutionPlan] = {} + self.current_plan_id: Optional[str] = None + + def add(self, message: Dict[str, str]): + """Add a message to memory""" + self.messages.append(message) + + def get(self) -> List[Dict[str, str]]: + """Get all messages""" + return self.messages + + # Plan management methods + def add_plan(self, plan: ExecutionPlan): + """Add an execution plan and set it as current""" + self.plans[plan.id] = plan + self.current_plan_id = plan.id + + def get_current_plan(self) -> Optional[ExecutionPlan]: + """Get the current active plan""" + if self.current_plan_id and self.current_plan_id in self.plans: + return self.plans[self.current_plan_id] + return None + + def update_plan(self, plan: ExecutionPlan): + """Update an existing plan""" + if plan.id in self.plans: + self.plans[plan.id] = plan + else: + raise ValueError(f'Plan {plan.id} not found in memory') + + def get_plan(self, plan_id: str) -> Optional[ExecutionPlan]: + """Get a plan by ID""" + return self.plans.get(plan_id) + + def set_current_plan(self, plan_id: str): + """Set the current active plan""" + if plan_id in self.plans: + self.current_plan_id = plan_id + else: + raise ValueError(f'Plan {plan_id} not found in memory') + + def get_all_plans(self) -> List[ExecutionPlan]: + """Get all plans""" + return list(self.plans.values()) + + def remove_plan(self, plan_id: str): + """Remove a plan from memory""" + if plan_id in self.plans: + del self.plans[plan_id] + if self.current_plan_id == plan_id: + self.current_plan_id = None