diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml index 4053d60..3f34ebe 100644 --- a/.github/workflows/examples.yml +++ b/.github/workflows/examples.yml @@ -14,6 +14,8 @@ env: jobs: examples: + # TODO: we should run only in prod + environment: staging runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 0cfba9f..cd90be3 100644 --- a/README.md +++ b/README.md @@ -626,6 +626,10 @@ validation that is too strict can lead to invalid generations. In case of an inv def my_agent(_: Input) -> :... ``` +## Workflows + +For advanced workflow patterns and examples, please refer to the [Workflows README](examples/workflows/README.md) for more details. + ## Contributing See the [CONTRIBUTING.md](./CONTRIBUTING.md) file for more details. diff --git a/examples/workflows/README.md b/examples/workflows/README.md new file mode 100644 index 0000000..478390b --- /dev/null +++ b/examples/workflows/README.md @@ -0,0 +1,65 @@ +# Workflows Patterns + +This README describes the five main patterns used in our workflows built using the WorkflowAI SDK. These patterns provide a structured method for composing complex AI tasks from simpler components, and allow you to balance flexibility and control in your AI applications. + +## 1. Sequential Processing (Chains) +In this pattern, tasks are executed in a fixed sequence, where the output of one step becomes the input for the next. This is ideal for linear processes such as content generation, data transformation, or any task that benefits from a clear, step-by-step flow. + +**Example:** +- Generate an initial result (e.g., marketing copy). +- Evaluate and refine that result through subsequent steps. + +For an implementation example, see [chain.py](chain.py). + +## 2. Routing +The routing pattern directs work based on intermediate results. An initial decision or classification determines which specialized agent should handle the next part of the workflow. This allows the system to adapt its behavior based on context (for instance, routing customer queries to different support teams). + +**Example:** +- Classify a customer query (e.g., as general, refund, or technical). +- Route the query to a specialized agent that handles that particular type. + +For an implementation example, see [routing.py](routing.py). + +## 3. Parallel Processing +Parallel processing splits work into independent subtasks that run concurrently. This pattern is used when different aspects of an input can be handled independently, leading to faster overall processing times. + +**Example:** +- Perform security, performance, and maintainability reviews on code simultaneously. +- Collect and aggregate the results after all tasks complete. + +For an implementation example, see [parallel_processing.py](parallel_processing.py). + +## 4. Orchestrator-Worker +In the orchestrator-worker pattern, one agent (the orchestrator) plans and coordinates the work, while multiple worker agents execute the individual parts of the plan in parallel. This pattern is particularly useful when a task can be decomposed into distinct planning and execution phases. + +**Example:** +- An orchestrator analyzes a feature request and generates an implementation plan with details on file changes. +- Worker agents then execute the planned changes concurrently. + +For an implementation example, see [orchestrator_worker.py](orchestrator_worker.py). + +## 5. Evaluator-Optimizer +The evaluator-optimizer pattern employs an iterative feedback loop. An initial output is evaluated for quality, and based on the feedback, improvements are made. This cycle is repeated until the output reaches the desired quality, or a maximum number of iterations is met. + +**Example:** +- Translate text using a fast initial model. +- Evaluate the translation's quality, tone, nuance, and cultural accuracy. +- If needed, refine the translation based on detailed feedback and repeat the process. + +For an implementation example, see [evaluator_optimizer.py](evaluator_optimizer.py). + +## 6. Chain of Agents (Long Context Processing) +The Chain of Agents pattern is designed for processing long documents or complex tasks that exceed the context window of a single model. In this pattern, multiple worker agents process different chunks of the input sequentially, passing their findings through the chain, while a manager agent synthesizes the final output. + +**Example:** +- Split a long document into manageable chunks +- Worker agents process each chunk sequentially, building upon previous findings +- A manager agent synthesizes all findings into a final, coherent response + +For an implementation example, see [chain_of_agents.py](chain_of_agents.py). + +--- + +These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples) and research from organizations like [Google Research](https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/). + +This README should serve as a high-level guide to understanding and using the different patterns available in our workflows. diff --git a/examples/workflows/chain.py b/examples/workflows/chain.py new file mode 100644 index 0000000..a77df96 --- /dev/null +++ b/examples/workflows/chain.py @@ -0,0 +1,149 @@ +import asyncio +from typing import TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class MarketingCopyInput(BaseModel): + """The product or concept for which to generate marketing copy.""" + + idea: str = Field(description="A short description or name of the product.") + + +class MarketingCopyOutput(BaseModel): + """Contains the AI generated marketing copy text for the provided product or concept.""" + + marketing_text: str = Field(description="Persuasive marketing copy text.") + + +@workflowai.agent(id="marketing-copy-generator", model=Model.GPT_4O_MINI_LATEST) +async def generate_marketing_copy_agent(_: MarketingCopyInput) -> MarketingCopyOutput: + """ + Write persuasive marketing copy for the provided idea. + Focus on benefits and emotional appeal. + """ + ... + + +class EvaluateCopyInput(BaseModel): + """Input type for evaluating the quality of marketing copy.""" + + marketing_text: str = Field(description="The marketing copy text to evaluate.") + + +class EvaluateCopyOutput(BaseModel): + """Evaluation results for the marketing copy.""" + + has_call_to_action: bool = Field(description="Whether a call to action is present.") + emotional_appeal: int = Field(description="Emotional appeal rating (1-10).") + clarity: int = Field(description="Clarity rating (1-10).") + + +# We use a smarter model (O1) to review the copy since evaluation requires more nuanced understanding +@workflowai.agent(id="marketing-copy-evaluator", model=Model.O1_MINI_LATEST) +async def evaluate_marketing_copy_agent(_: EvaluateCopyInput) -> EvaluateCopyOutput: + """ + Evaluate the marketing copy for: + 1) Presence of a call to action (true/false) + 2) Emotional appeal (1-10) + 3) Clarity (1-10) + Return the results as a structured output. + """ + ... + + +class RewriteCopyInput(BaseModel): + """Input for rewriting the marketing copy with targeted improvements.""" + + original_copy: str = Field(default="", description="Original marketing copy.") + add_call_to_action: bool = Field(default=False, description="Whether we need a clear call to action.") + strengthen_emotional_appeal: bool = Field(default=False, description="Whether emotional appeal needs a boost.") + improve_clarity: bool = Field(default=False, description="Whether clarity needs improvement.") + + +class RewriteCopyOutput(BaseModel): + """Contains the improved marketing copy.""" + + marketing_text: str = Field(description="The improved marketing copy text.") + + +# Claude 3.5 Sonnet is a more powerful model for copywriting +@workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) +async def rewrite_marketing_copy_agent(_: RewriteCopyInput) -> RewriteCopyOutput: + """ + Rewrite the marketing copy with the specified improvements: + - A clear CTA if requested + - Stronger emotional appeal if requested + - Improved clarity if requested + """ + ... + + +class MarketingResult(TypedDict): + original_copy: str + final_copy: str + was_improved: bool + quality_metrics: EvaluateCopyOutput + + +async def generate_marketing_copy(idea: str) -> MarketingResult: + """ + Demonstrates a chain flow: + 1) Generate an initial marketing copy. + 2) Evaluate its quality. + 3) If the quality is lacking, request a rewrite with clearer CTA, stronger emotional appeal, and/or clarity. + 4) Return the final copy and metrics. + """ + # Step 1: Generate initial copy + generation = await generate_marketing_copy_agent(MarketingCopyInput(idea=idea)) + original_copy = generation.marketing_text + final_copy = original_copy + + # Step 2: Evaluate the copy + evaluation = await evaluate_marketing_copy_agent(EvaluateCopyInput(marketing_text=original_copy)) + + # Step 3: Check evaluation results. If below thresholds, rewrite + needs_improvement = not evaluation.has_call_to_action or evaluation.emotional_appeal < 7 or evaluation.clarity < 7 + + if needs_improvement: + rewrite = await rewrite_marketing_copy_agent( + RewriteCopyInput( + original_copy=original_copy, + add_call_to_action=not evaluation.has_call_to_action, + strengthen_emotional_appeal=evaluation.emotional_appeal < 7, + improve_clarity=evaluation.clarity < 7, + ), + ) + final_copy = rewrite.marketing_text + + return { + "original_copy": original_copy, + "final_copy": final_copy, + "was_improved": needs_improvement, + "quality_metrics": evaluation, + } + + +if __name__ == "__main__": + idea = "A open-source platform for AI agents" + result = asyncio.run(generate_marketing_copy(idea)) + + print("\n=== Input Idea ===") + print(idea) + + print("\n=== Marketing Copy ===") + print(result["original_copy"]) + + print("\n=== Quality Assessment ===") + metrics = result["quality_metrics"] + print(f"āœ“ Call to Action: {'Present' if metrics.has_call_to_action else 'Missing'}") + print(f"āœ“ Emotional Appeal: {metrics.emotional_appeal}/10") + print(f"āœ“ Clarity: {metrics.clarity}/10") + + if result["was_improved"]: + print("\n=== Improved Marketing Copy ===") + print(result["final_copy"]) + print() diff --git a/examples/workflows/chain_of_agents.py b/examples/workflows/chain_of_agents.py new file mode 100644 index 0000000..6bdd169 --- /dev/null +++ b/examples/workflows/chain_of_agents.py @@ -0,0 +1,204 @@ +""" +This implementation is inspired by Google Research's Chain of Agents (CoA) framework: +https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/ + +CoA is a training-free, task-agnostic framework for handling long-context tasks through LLM collaboration. +Instead of trying to fit everything into a single context window, it: +1. Breaks input into manageable chunks +2. Uses worker agents to process chunks sequentially, passing information forward +3. Uses a manager agent to synthesize the final answer + +Key benefits of this approach: +- More efficient than full-context processing (O(nk) vs O(n²) complexity) +- Better performance than RAG for complex reasoning tasks +- Scales well with input length (performance improves with longer inputs) +- No training required, works with any LLM +""" + +import asyncio +from typing import List + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class DocumentChunk(BaseModel): + """A chunk of text from a long document.""" + + content: str = Field(description="The content of this document chunk.") + + +class WorkerInput(BaseModel): + """Input for a worker agent processing a document chunk.""" + + chunk: DocumentChunk = Field(description="The current chunk to process.") + query: str = Field(description="The query or task to be answered.") + previous_findings: str = Field( + default="", + description="Accumulated findings from previous workers.", + ) + + +class WorkerOutput(BaseModel): + """Output from a worker agent containing findings and evidence.""" + + findings: str = Field( + description="Key findings and information extracted from this chunk.", + ) + evidence: List[str] = Field( + default_factory=list, + description="Supporting evidence or quotes from the chunk.", + ) + message_for_next: str = Field( + description="Message to pass to the next worker with relevant context.", + ) + + +class ManagerInput(BaseModel): + """Input for the manager agent to generate final response.""" + + query: str = Field(description="The original query to answer.") + accumulated_findings: str = Field( + description="All findings accumulated through the worker chain.", + ) + + +class ManagerOutput(BaseModel): + """Final output from the manager agent.""" + + answer: str = Field(description="The final answer to the query.") + reasoning: str = Field(description="Explanation of how the answer was derived.") + supporting_evidence: List[str] = Field( + default_factory=list, + description="Key evidence supporting the answer.", + ) + + +@workflowai.agent(id="document-worker", model=Model.GPT_4O_MINI_LATEST) +async def worker_agent(_: WorkerInput) -> WorkerOutput: + """ + Process your assigned document chunk to: + 1. Extract relevant information related to the query + 2. Build upon findings from previous workers + 3. Pass important context to the next worker + + Be concise but thorough in your analysis. + Focus on information that could be relevant to the query. + """ + ... + + +@workflowai.agent(id="document-manager", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def manager_agent(_: ManagerInput) -> ManagerOutput: + """ + Synthesize all findings from the worker agents to: + 1. Generate a comprehensive answer to the query + 2. Provide clear reasoning for your answer + 3. Include supporting evidence from the document + + Ensure your answer is well-supported by the accumulated findings. + """ + ... + + +async def process_long_document( + document: str, + query: str, + chunk_size: int = 2000, +) -> ManagerOutput: + """ + Process a long document using Chain of Agents pattern: + 1. Split document into chunks + 2. Have worker agents process each chunk sequentially + 3. Pass findings through the chain + 4. Have manager agent synthesize final answer + """ + # Split document into chunks + chunks = [document[i : i + chunk_size] for i in range(0, len(document), chunk_size)] + + # Convert to DocumentChunk objects + doc_chunks = [DocumentChunk(content=chunk) for chunk in chunks] + + # Initialize accumulator for findings + accumulated_findings = "" + + # Process chunks sequentially through worker chain + for chunk in doc_chunks: + worker_input = WorkerInput( + chunk=chunk, + query=query, + previous_findings=accumulated_findings, + ) + + # Get worker's output + worker_output = await worker_agent(worker_input) + + # Accumulate findings + if accumulated_findings: + accumulated_findings += "\n\n" + accumulated_findings += f"Findings:\n{worker_output.findings}" + + if worker_output.evidence: + accumulated_findings += "\nEvidence:\n- " + accumulated_findings += "\n- ".join(worker_output.evidence) + + # Have manager synthesize final answer + manager_input = ManagerInput( + query=query, + accumulated_findings=accumulated_findings, + ) + + return await manager_agent(manager_input) + + +async def main(): + # Example long document + document = """ + The Industrial Revolution was a period of major industrialization and innovation during the late 18th and early 19th centuries. It began in Britain and later spread to other parts of the world. This era marked a major turning point in human history, fundamentally changing economic and social organization. Steam power, mechanization, and new manufacturing processes revolutionized production methods. The introduction of new technologies and manufacturing techniques led to unprecedented increases in productivity and efficiency. + + The development of new manufacturing processes led to significant changes in how goods were produced. The factory system replaced cottage industries, bringing workers into centralized locations. New iron production processes using coke instead of charcoal dramatically increased output. Textile manufacturing was transformed by inventions like the spinning jenny and power loom. These changes enabled mass production of goods at unprecedented scales. The mechanization of agriculture through innovations like the seed drill and threshing machine reduced the labor needed for farming while increasing food production. + + Social and economic impacts were profound, affecting both urban and rural life. Cities grew rapidly as people moved from rural areas to work in factories. Working conditions were often harsh, with long hours, dangerous conditions, and child labor being common. A new middle class emerged, while traditional skilled craftsmen often struggled. Living conditions in industrial cities were frequently overcrowded and unsanitary, leading to disease outbreaks. The rise of labor movements and trade unions began in response to poor working conditions and low wages. Education systems were developed to provide workers with basic literacy and numeracy skills needed in the new industrial economy. + + Environmental consequences became apparent as industrialization progressed. Coal burning led to severe air pollution in industrial cities, creating smog and respiratory problems. Rivers became contaminated with industrial waste and raw sewage. Deforestation increased as wood was needed for construction and fuel. The landscape was transformed by mines, factories, and expanding urban areas. These changes marked the beginning of large-scale human impact on the environment, leading to problems that would only be recognized and addressed much later. + + Transportation and communication systems underwent revolutionary changes during this period. The development of railways, steam-powered ships, and improved road networks facilitated the movement of goods and people. The telegraph enabled rapid long-distance communication for the first time. These advances in transportation and communication helped create larger markets and more integrated economies. + + The Industrial Revolution also had significant cultural and intellectual impacts. Scientific and technological progress led to increased faith in human capability and reason. The Enlightenment ideals of progress and rationality found practical expression in industrial innovations. New forms of urban culture emerged, while traditional rural ways of life declined. The period saw the rise of new philosophical and political movements, including socialism, in response to industrial conditions. + + Public health and medicine saw both challenges and advances during this period. The concentration of population in cities led to serious health problems, including cholera epidemics and tuberculosis outbreaks. However, these challenges eventually spurred improvements in urban sanitation and medical knowledge. The development of public health systems and modern medical practices can be traced to this era. + + The Industrial Revolution's impact on agriculture extended beyond mechanization. New farming techniques and crop rotation methods increased agricultural productivity. The application of scientific principles to farming led to improved livestock breeding and crop yields. These agricultural advances were necessary to feed growing urban populations and free up labor for industrial work. + + The financial and business systems also evolved significantly. New forms of business organization, such as joint-stock companies, emerged to handle larger-scale industrial operations. Banking and credit systems expanded to provide capital for industrial growth. Insurance companies developed to manage the risks of industrial enterprises. These financial innovations helped fuel continued industrial expansion. + + The role of women in society began to change during this period. While many women worked in factories under difficult conditions, the Industrial Revolution also created new opportunities for women's employment in sectors like textile manufacturing. Middle-class women often found themselves managing increasingly complex households. The seeds of women's rights movements were planted during this era. + + The global impact of the Industrial Revolution was profound and long-lasting. As industrialization spread from Britain to other countries, it created new patterns of international trade and competition. Colonial empires expanded as industrial nations sought raw materials and markets. The technological and economic gaps between industrialized and non-industrialized regions grew, shaping global relationships that persist to the present day. + + The Industrial Revolution's legacy continues to influence modern society. Many current environmental challenges, including climate change, can be traced to the industrial practices established during this period. Modern labor laws, education systems, and urban planning still reflect responses to Industrial Revolution conditions. Understanding this transformative period helps explain many aspects of contemporary life and the ongoing challenges we face. + """ # noqa: E501 + + query = "What were the major environmental impacts of the Industrial Revolution?" + + result = await process_long_document(document, query) + + print("\n=== Query ===") + print(query) + + print("\n=== Answer ===") + print(result.answer) + + print("\n=== Reasoning ===") + print(result.reasoning) + + if result.supporting_evidence: + print("\n=== Supporting Evidence ===") + for evidence in result.supporting_evidence: + print(f"- {evidence}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/workflows/evaluator_optimizer.py b/examples/workflows/evaluator_optimizer.py new file mode 100644 index 0000000..2867092 --- /dev/null +++ b/examples/workflows/evaluator_optimizer.py @@ -0,0 +1,170 @@ +import asyncio +from typing import List, TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class TranslationInput(BaseModel): + """Input for translation.""" + + text: str = Field(description="The text to translate.") + target_language: str = Field(description="The target language for translation.") + + +class TranslationOutput(BaseModel): + """Output containing the translated text.""" + + translation: str = Field(description="The translated text.") + + +# Uses GPT-4O Mini for fast initial translation +@workflowai.agent(id="initial-translator", model=Model.GPT_4O_MINI_LATEST) +async def initial_translation_agent(_: TranslationInput) -> TranslationOutput: + """ + Expert literary translator. + Translate text while preserving tone and cultural nuances. + """ + ... + + +class TranslationEvaluationInput(BaseModel): + """Input for evaluating translation quality.""" + + original_text: str = Field(description="The original text.") + translation: str = Field(description="The translation to evaluate.") + target_language: str = Field(description="The target language of the translation.") + + +class TranslationEvaluationOutput(BaseModel): + """Output containing the translation evaluation.""" + + quality_score: int = Field(description="Overall quality score (1-10).") + preserves_tone: bool = Field(description="Whether the translation preserves the original tone.") + preserves_nuance: bool = Field(description="Whether the translation preserves subtle nuances.") + culturally_accurate: bool = Field(description="Whether the translation is culturally appropriate.") + specific_issues: List[str] = Field(description="List of specific issues identified.") + improvement_suggestions: List[str] = Field(description="List of suggested improvements.") + + +# Uses O1 for its strong analytical and evaluation capabilities +@workflowai.agent(id="translation-evaluator", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def evaluate_translation_agent(_: TranslationEvaluationInput) -> TranslationEvaluationOutput: + """ + Expert in evaluating literary translations. + Evaluate translations for quality, tone preservation, nuance, and cultural accuracy. + """ + ... + + +class TranslationImprovementInput(BaseModel): + """Input for improving translation based on feedback.""" + + original_text: str = Field(description="The original text.") + current_translation: str = Field(description="The current translation.") + target_language: str = Field(description="The target language.") + specific_issues: List[str] = Field(description="Issues to address.") + improvement_suggestions: List[str] = Field(description="Suggestions for improvement.") + + +class TranslationImprovementOutput(BaseModel): + """Output containing the improved translation.""" + + translation: str = Field(description="The improved translation.") + + +# Uses GPT-4O for high-quality translation refinement +@workflowai.agent(id="translation-improver", model=Model.GPT_4O_LATEST) +async def improve_translation_agent(_: TranslationImprovementInput) -> TranslationImprovementOutput: + """ + Expert literary translator. + Improve translations based on specific feedback while maintaining overall quality. + """ + ... + + +class TranslationResult(TypedDict): + final_translation: str + iterations_required: int + + +async def translate_with_feedback(text: str, target_language: str) -> TranslationResult: + """ + Translate text with iterative feedback and improvement: + 1. Generate initial translation with a faster model + 2. Evaluate translation quality + 3. If quality threshold not met, improve based on feedback + 4. Repeat evaluation-improvement cycle up to max_iterations + """ + max_iterations = 3 + iterations = 0 + + # Initial translation using faster model + current_translation = await initial_translation_agent( + TranslationInput( + text=text, + target_language=target_language, + ), + ) + + while iterations < max_iterations: + # Evaluate current translation + evaluation = await evaluate_translation_agent( + TranslationEvaluationInput( + original_text=text, + translation=current_translation.translation, + target_language=target_language, + ), + ) + + # Check if quality meets threshold + if ( + evaluation.quality_score >= 8 + and evaluation.preserves_tone + and evaluation.preserves_nuance + and evaluation.culturally_accurate + ): + break + + # Generate improved translation based on feedback + improved = await improve_translation_agent( + TranslationImprovementInput( + original_text=text, + current_translation=current_translation.translation, + target_language=target_language, + specific_issues=evaluation.specific_issues, + improvement_suggestions=evaluation.improvement_suggestions, + ), + ) + + current_translation.translation = improved.translation + iterations += 1 + + return { + "final_translation": current_translation.translation, + "iterations_required": iterations, + } + + +if __name__ == "__main__": + # Example text to translate + text = """ + The old bookstore, nestled in the heart of the city, + was a sanctuary for bibliophiles. Its wooden shelves, + worn smooth by decades of searching hands, held stories + in dozens of languages. The owner, a silver-haired woman + with kind eyes, knew each book's location by heart. + """ + target_language = "French" + + result = asyncio.run(translate_with_feedback(text, target_language)) + + print("\n=== Translation Result ===") + print("\nOriginal Text:") + print(text) + print("\nFinal Translation:") + print(result["final_translation"]) + print(f"\nIterations Required: {result['iterations_required']}") + print() diff --git a/examples/workflows/orchestrator_worker.py b/examples/workflows/orchestrator_worker.py new file mode 100644 index 0000000..abe3fd5 --- /dev/null +++ b/examples/workflows/orchestrator_worker.py @@ -0,0 +1,165 @@ +import asyncio +from enum import Enum +from typing import List, TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class ChangeType(str, Enum): + CREATE = "create" + MODIFY = "modify" + DELETE = "delete" + + +class ComplexityLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class FileChange(BaseModel): + """Describes a planned change to a file.""" + + purpose: str = Field(description="Purpose of the change in the context of the feature.") + file_path: str = Field(description="Path to the file to be changed.") + change_type: ChangeType = Field(description="Type of change to be made.") + + +class ImplementationPlanInput(BaseModel): + """Input for generating an implementation plan.""" + + feature_request: str = Field(description="The feature request to implement.") + + +class ImplementationPlanOutput(BaseModel): + """Output containing the implementation plan.""" + + files: List[FileChange] = Field(description="List of files to be changed.") + estimated_complexity: ComplexityLevel = Field(description="Estimated complexity of the implementation.") + + +# Uses O1 for its strong architectural planning capabilities +@workflowai.agent(id="implementation-planner", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def plan_implementation_agent(_: ImplementationPlanInput) -> ImplementationPlanOutput: + """ + Senior software architect planning feature implementations. + Analyze feature requests and create detailed implementation plans. + """ + ... + + +class FileImplementationInput(BaseModel): + """Input for implementing file changes.""" + + file_path: str = Field(description="Path to the file being changed.") + purpose: str = Field(description="Purpose of the change.") + feature_request: str = Field(description="Overall feature context.") + change_type: ChangeType = Field(description="Type of change being made.") + + +class FileImplementationOutput(BaseModel): + """Output containing the implemented changes.""" + + explanation: str = Field(description="Explanation of the implemented changes.") + code: str = Field(description="The implemented code changes.") + + +# Uses GPT-4O for its strong code generation and modification capabilities +@workflowai.agent(id="file-implementer", model=Model.GPT_4O_LATEST) +async def implement_file_changes_agent(_: FileImplementationInput) -> FileImplementationOutput: + """ + Expert at implementing code changes based on the change type: + - CREATE: Implement new files following best practices and project patterns + - MODIFY: Modify existing code while maintaining consistency + - DELETE: Safely remove code while ensuring no breaking changes + """ + ... + + +class ImplementationChange(TypedDict): + file: FileChange + implementation: FileImplementationOutput + + +class FeatureImplementationResult(TypedDict): + plan: ImplementationPlanOutput + changes: List[ImplementationChange] + + +async def implement_feature(feature_request: str) -> FeatureImplementationResult: + """ + Implement a feature using an orchestrator-worker pattern: + 1. Orchestrator (planner) analyzes the request and creates an implementation plan + 2. Workers execute the planned changes in parallel + 3. Return both the plan and implemented changes + """ + # Orchestrator: Plan the implementation + implementation_plan = await plan_implementation_agent( + ImplementationPlanInput(feature_request=feature_request), + ) + + # Workers: Execute the planned changes in parallel + file_changes = await asyncio.gather( + *[ + implement_file_changes_agent( + FileImplementationInput( + file_path=file.file_path, + purpose=file.purpose, + feature_request=feature_request, + change_type=file.change_type, + ), + ) + for file in implementation_plan.files + ], + ) + + # Combine results + changes: List[ImplementationChange] = [ + { + "file": implementation_plan.files[i], + "implementation": change, + } + for i, change in enumerate(file_changes) + ] + + return { + "plan": implementation_plan, + "changes": changes, + } + + +if __name__ == "__main__": + # Example feature request + feature_request = """ + Add a new user authentication endpoint that: + 1. Accepts username/password + 2. Validates credentials + 3. Returns a JWT token + 4. Includes rate limiting + """ + + result = asyncio.run(implement_feature(feature_request)) + + print("\n=== Implementation Plan ===") + print(f"Estimated Complexity: {result['plan'].estimated_complexity}") + print("\nPlanned Changes:") + for file in result["plan"].files: + print(f"\n- {file.change_type.upper()}: {file.file_path}") + print(f" Purpose: {file.purpose}") + + print("\n=== Implemented Changes ===") + for change in result["changes"]: + file = change["file"] + impl = change["implementation"] + + print(f"\n=== {file.change_type.upper()}: {file.file_path} ===") + print("\nPurpose:") + print(file.purpose) + print("\nExplanation:") + print(impl.explanation) + print("\nCode:") + print(impl.code) + print() diff --git a/examples/workflows/parallel_processing.py b/examples/workflows/parallel_processing.py new file mode 100644 index 0000000..5b2f27c --- /dev/null +++ b/examples/workflows/parallel_processing.py @@ -0,0 +1,201 @@ +import asyncio +from enum import Enum +from typing import List, TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class RiskLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class SecurityReviewInput(BaseModel): + """Input for security code review.""" + + code: str = Field(description="The code to review for security issues.") + + +class SecurityReviewOutput(BaseModel): + """Output from security code review.""" + + vulnerabilities: List[str] = Field(description="List of identified security vulnerabilities.") + risk_level: RiskLevel = Field(description="Overall security risk level.") + suggestions: List[str] = Field(description="Security improvement suggestions.") + + +# Uses Claude 3.5 Sonnet for its strong security analysis capabilities +@workflowai.agent(id="security-reviewer", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def security_review_agent(_: SecurityReviewInput) -> SecurityReviewOutput: + """ + Expert in code security. + Focus on identifying security vulnerabilities, injection risks, and authentication issues. + """ + ... + + +class PerformanceReviewInput(BaseModel): + """Input for performance code review.""" + + code: str = Field(description="The code to review for performance issues.") + + +class PerformanceReviewOutput(BaseModel): + """Output from performance code review.""" + + issues: List[str] = Field(description="List of identified performance issues.") + impact: RiskLevel = Field(description="Impact level of performance issues.") + optimizations: List[str] = Field(description="Performance optimization suggestions.") + + +# Uses O1 Mini for its expertise in performance optimization +@workflowai.agent(id="performance-reviewer", model=Model.O1_MINI_LATEST) +async def performance_review_agent(_: PerformanceReviewInput) -> PerformanceReviewOutput: + """ + Expert in code performance. + Focus on identifying performance bottlenecks, memory leaks, and optimization opportunities. + """ + ... + + +class MaintainabilityReviewInput(BaseModel): + """Input for maintainability code review.""" + + code: str = Field(description="The code to review for maintainability issues.") + + +class MaintainabilityReviewOutput(BaseModel): + """Output from maintainability code review.""" + + concerns: List[str] = Field(description="List of maintainability concerns.") + quality_score: int = Field(description="Code quality score (1-10).", ge=1, le=10) + recommendations: List[str] = Field(description="Maintainability improvement recommendations.") + + +# Uses Claude 3.5 Sonnet for its strong code quality and readability analysis +@workflowai.agent(id="maintainability-reviewer", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def maintainability_review_agent(_: MaintainabilityReviewInput) -> MaintainabilityReviewOutput: + """ + Expert in code quality. + Focus on code structure, readability, and adherence to best practices. + """ + ... + + +class ReviewSummaryInput(BaseModel): + """Input for review summary generation.""" + + security_review: SecurityReviewOutput = Field(description="Security review results.") + performance_review: PerformanceReviewOutput = Field(description="Performance review results.") + maintainability_review: MaintainabilityReviewOutput = Field(description="Maintainability review results.") + + +class ReviewSummaryOutput(BaseModel): + """Output containing the summarized review.""" + + summary: str = Field(description="Concise summary of all reviews with key actions.") + + +# Uses O1 for its strong synthesis and summarization abilities +@workflowai.agent(id="review-summarizer", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def summarize_reviews_agent(_: ReviewSummaryInput) -> ReviewSummaryOutput: + """ + Technical lead summarizing multiple code reviews. + Synthesize review results into a concise summary with key actions. + """ + ... + + +class CodeReviewResult(TypedDict): + security_review: SecurityReviewOutput + performance_review: PerformanceReviewOutput + maintainability_review: MaintainabilityReviewOutput + summary: str + + +async def parallel_code_review(code: str) -> CodeReviewResult: + """ + Perform parallel code reviews using specialized agents: + 1. Security review for vulnerabilities and risks + 2. Performance review for optimization opportunities + 3. Maintainability review for code quality + 4. Synthesize results into an actionable summary + """ + # Run parallel reviews + security_review, performance_review, maintainability_review = await asyncio.gather( + security_review_agent(SecurityReviewInput(code=code)), + performance_review_agent(PerformanceReviewInput(code=code)), + maintainability_review_agent(MaintainabilityReviewInput(code=code)), + ) + + # Aggregate and summarize results + summary = await summarize_reviews_agent( + ReviewSummaryInput( + security_review=security_review, + performance_review=performance_review, + maintainability_review=maintainability_review, + ), + ) + + return { + "security_review": security_review, + "performance_review": performance_review, + "maintainability_review": maintainability_review, + "summary": summary.summary, + } + + +if __name__ == "__main__": + # Example code to review + code_to_review = """ + def process_user_input(user_input): + # Execute the input as a command + result = eval(user_input) + return result + + def cache_data(data): + # Store everything in memory + global_cache.append(data) + + def get_user_data(user_id): + # Query without parameterization + cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") + return cursor.fetchall() + """ + + result = asyncio.run(parallel_code_review(code_to_review)) + + print("\n=== Security Review ===") + print(f"Risk Level: {result['security_review'].risk_level}") + print("\nVulnerabilities:") + for v in result["security_review"].vulnerabilities: + print(f"- {v}") + print("\nSuggestions:") + for s in result["security_review"].suggestions: + print(f"- {s}") + + print("\n=== Performance Review ===") + print(f"Impact Level: {result['performance_review'].impact}") + print("\nIssues:") + for i in result["performance_review"].issues: + print(f"- {i}") + print("\nOptimizations:") + for o in result["performance_review"].optimizations: + print(f"- {o}") + + print("\n=== Maintainability Review ===") + print(f"Quality Score: {result['maintainability_review'].quality_score}/10") + print("\nConcerns:") + for c in result["maintainability_review"].concerns: + print(f"- {c}") + print("\nRecommendations:") + for r in result["maintainability_review"].recommendations: + print(f"- {r}") + + print("\n=== Summary ===") + print(result["summary"]) + print() diff --git a/examples/workflows/routing.py b/examples/workflows/routing.py new file mode 100644 index 0000000..81a82f2 --- /dev/null +++ b/examples/workflows/routing.py @@ -0,0 +1,141 @@ +import asyncio +from enum import Enum +from typing import TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + +class QueryType(str, Enum): + GENERAL = "general" + REFUND = "refund" + TECHNICAL = "technical" + + +class ComplexityLevel(str, Enum): + SIMPLE = "simple" + COMPLEX = "complex" + + +class ClassificationInput(BaseModel): + """Input for query classification.""" + + query: str = Field(description="The customer query to classify.") + + +class ClassificationOutput(BaseModel): + """Output containing query classification details.""" + + reasoning: str = Field(description="Brief reasoning for the classification.") + type: QueryType = Field(description="Type of the query (general, refund, or technical).") + complexity: ComplexityLevel = Field(description="Complexity level of the query.") + + +# Uses GPT-4O for its strong analytical and classification capabilities +@workflowai.agent(id="query-classifier", model=Model.GPT_4O_LATEST) +async def classify_query_agent( + _: ClassificationInput, +) -> ClassificationOutput: + """ + Classify the customer query by: + 1. Query type (general, refund, or technical) + 2. Complexity (simple or complex) + 3. Provide brief reasoning for classification + """ + ... + + +class ResponseInput(BaseModel): + """Input for generating customer response.""" + + query: str = Field(description="The customer query to respond to.") + query_type: QueryType = Field(description="Type of the query for specialized handling.") + + +class ResponseOutput(BaseModel): + """Output containing the response to the customer.""" + + response: str = Field(description="The generated response to the customer query.") + + +# Uses Claude 3.5 Sonnet for its strong conversational abilities and empathy +@workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) +async def handle_general_query(_: ResponseInput) -> ResponseOutput: + """Expert customer service agent handling general inquiries.""" + ... + + +# Uses GPT-4O Mini for efficient policy-based responses +@workflowai.agent(model=Model.GPT_4O_MINI_LATEST) +async def handle_refund_query(_: ResponseInput) -> ResponseOutput: + """Customer service agent specializing in refund requests.""" + ... + + +# Uses O1 Mini for its technical expertise and problem-solving capabilities +@workflowai.agent(model=Model.O1_MINI_LATEST) +async def handle_technical_query(_: ResponseInput) -> ResponseOutput: + """Technical support specialist providing troubleshooting assistance.""" + ... + + +class QueryResult(TypedDict): + response: str + classification: ClassificationOutput + + +async def handle_customer_query(query: str) -> QueryResult: + """ + Handle a customer query through a workflow: + 1. Classify the query type and complexity + 2. Route to appropriate specialized agent + 3. Return response and classification details + """ + # Step 1: Classify the query + classification = await classify_query_agent(ClassificationInput(query=query)) + + # Step 2: Route to appropriate handler based on type and complexity + handlers = { + (QueryType.GENERAL, ComplexityLevel.SIMPLE): handle_general_query, + (QueryType.GENERAL, ComplexityLevel.COMPLEX): handle_general_query, + (QueryType.REFUND, ComplexityLevel.SIMPLE): handle_refund_query, + (QueryType.REFUND, ComplexityLevel.COMPLEX): handle_refund_query, + (QueryType.TECHNICAL, ComplexityLevel.SIMPLE): handle_technical_query, + (QueryType.TECHNICAL, ComplexityLevel.COMPLEX): handle_technical_query, + } + + # Get appropriate handler + handler = handlers[(classification.type, classification.complexity)] + + # Generate response + result = await handler( + ResponseInput( + query=query, + query_type=classification.type, + ), + ) + + return { + "response": result.response, + "classification": classification, + } + + +if __name__ == "__main__": + query = "I'm having trouble logging into my account." + " It keeps saying invalid password even though I'm sure it's correct." + result = asyncio.run(handle_customer_query(query)) + + print("\n=== Customer Query ===") + print(query) + + print("\n=== Classification ===") + print(f"Type: {result['classification'].type}") + print(f"Complexity: {result['classification'].complexity}") + print(f"Reasoning: {result['classification'].reasoning}") + + print("\n=== Response ===") + print(result["response"]) + print()