diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9fa825e..7fcbc05 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -38,7 +38,14 @@ poetry run pytest tests/e2e #### Configuring VSCode -Suggested extensions are available in the [.vscode/extensions.json](.vscode/extensions.json) file. +Suggested extensions are available in the [.vscode/extensions.json](.vscode/extensions.json) file. When you open this project in VSCode or Cursor, you'll be prompted to install these recommended extensions automatically. + +To manually install recommended extensions: +1. Open VSCode/Cursor Command Palette (Cmd/Ctrl + Shift + P) +2. Type "Show Recommended Extensions" +3. Install the ones marked with @recommended + +These extensions will help ensure consistent code quality and style across all contributors. ### Dependencies diff --git a/README.md b/README.md index 026bf58..5fab7f1 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ client = WorkflowAI( # Use the client to create and run agents @client.agent() -def my_agent(task_input: Input) -> Output: +def my_agent(agent_input: Input) -> Output: ... ``` @@ -154,16 +154,16 @@ feedback_input = CallFeedbackInput( ) # Analyze the feedback -result = await analyze_call_feedback(feedback_input) +run = await analyze_call_feedback(feedback_input) # Print the analysis print("\nPositive Points:") -for point in result.positive_points: +for point in run.positive_points: print(f"\n• {point.point}") print(f" Quote [{point.timestamp}]: \"{point.quote}\"") print("\nNegative Points:") -for point in result.negative_points: +for point in run.negative_points: print(f"\n• {point.point}") print(f" Quote [{point.timestamp}]: \"{point.quote}\"") ``` @@ -354,7 +354,33 @@ An example of using a PDF as input is available in [pdf_answer.py](./examples/pd ### Audio -[todo] +Use the `File` class to pass audio files as input to an agent. Note that only some models support audio input. + +```python +from workflowai.fields import File +... + +class AudioInput(BaseModel): + audio: File = Field(description="The audio recording to analyze for spam/robocall detection") + +class AudioClassification(BaseModel): + is_spam: bool = Field(description="Whether the audio is classified as spam/robocall") + +@workflowai.agent(id="audio-classifier", model=Model.GEMINI_1_5_FLASH_LATEST) +async def classify_audio(input: AudioInput) -> AudioClassification: + ... + +# Example 1: Using base64 encoded data +audio = File(content_type='audio/mp3', data='') + +# Example 2: Using a URL +# audio = File(url='https://example.com/audio/call.mp3') + +run = await classify_audio(AudioInput(audio=audio)) +print(run) +``` + +See an example of audio classification in [audio_classifier.py](./examples/04_audio_classifier.py). ### Caching @@ -590,7 +616,7 @@ async def analyze_call_feedback_strict(input: CallFeedbackInput) -> CallFeedback ... try: - result = await analyze_call_feedback_strict( + run = await analyze_call_feedback_strict( CallFeedbackInput( transcript="[00:01:15] Customer: The product is great!", call_date=date(2024, 1, 15) @@ -608,13 +634,13 @@ async def analyze_call_feedback_tolerant(input: CallFeedbackInput) -> CallFeedba ... # The invalid_generation is less likely -result = await analyze_call_feedback_tolerant( +run = await analyze_call_feedback_tolerant( CallFeedbackInput( transcript="[00:01:15] Customer: The product is great!", call_date=date(2024, 1, 15) ) ) -if not result.positive_points and not result.negative_points: +if not run.positive_points and not run.negative_points: print("No feedback points were generated!") ``` @@ -630,15 +656,14 @@ absent will cause `AttributeError` when queried. async def analyze_call_feedback_stream(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]: ... -async for result in analyze_call_feedback_stream( +async for run in analyze_call_feedback_stream( CallFeedbackInput( transcript="[00:01:15] Customer: The product is great!", call_date=date(2024, 1, 15) ) ): - # With default values, we can safely check the points as they stream in - print(f"Positive points so far: {len(result.positive_points)}") - print(f"Negative points so far: {len(result.negative_points)}") + print(f"Positive points so far: {len(run.positive_points)}") + print(f"Negative points so far: {len(run.negative_points)}") ``` #### Field properties diff --git a/examples/01_basic_agent.py b/examples/01_basic_agent.py new file mode 100644 index 0000000..239ed4b --- /dev/null +++ b/examples/01_basic_agent.py @@ -0,0 +1,75 @@ +""" +This example demonstrates how to create a basic WorkflowAI agent that takes a city name +and returns information about the capital of its country. It showcases: + +1. Basic agent creation with input/output models +2. Field descriptions and examples +3. Cost and latency tracking +""" + +import asyncio + +from pydantic import BaseModel, Field + +import workflowai +from workflowai import Model, Run + + +class CityInput(BaseModel): + """Input model for the city-to-capital agent.""" + city: str = Field( + description="The name of the city for which to find the country's capital", + examples=["Paris", "New York", "Tokyo"], + ) + + +class CapitalOutput(BaseModel): + """Output model containing information about the capital city.""" + country: str = Field( + description="The country where the input city is located", + examples=["France", "United States", "Japan"], + ) + capital: str = Field( + description="The capital city of the country", + examples=["Paris", "Washington D.C.", "Tokyo"], + ) + fun_fact: str = Field( + description="An interesting fact about the capital city", + examples=["Paris has been the capital of France since 508 CE"], + ) + + +@workflowai.agent( + id="city-to-capital", + model=Model.CLAUDE_3_5_SONNET_LATEST, +) +async def get_capital_info(city_input: CityInput) -> Run[CapitalOutput]: + """ + Find the capital city of the country where the input city is located. + + Guidelines: + 1. First identify the country where the input city is located + 2. Then provide the capital city of that country + 3. Include an interesting historical or cultural fact about the capital + 4. Be accurate and precise with geographical information + 5. If the input city is itself the capital, still provide the information + """ + ... + + +async def main(): + # Example 1: Basic usage with Paris + print("\nExample 1: Basic usage with Paris") + print("-" * 50) + run = await get_capital_info(CityInput(city="Paris")) + print(run) + + # Example 2: Using Tokyo + print("\nExample 2: Using Tokyo") + print("-" * 50) + run = await get_capital_info(CityInput(city="Tokyo")) + print(run) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/03_caching.py b/examples/03_caching.py new file mode 100644 index 0000000..e55d85c --- /dev/null +++ b/examples/03_caching.py @@ -0,0 +1,159 @@ +""" +This example demonstrates the different caching options in WorkflowAI: +1. 'auto' - Cache only when temperature is 0 (default) +2. 'always' - Always use cache if available +3. 'never' - Never use cache, always execute new runs + +The example uses a medical SOAP notes extractor to show how caching affects: +- Response consistency (important for medical documentation) +- Cost efficiency +- Execution time +""" + +import asyncio +import time +from typing import Literal, TypedDict + +from pydantic import BaseModel, Field + +import workflowai +from workflowai import Model, Run + +# Import CacheUsage type +CacheUsage = Literal["auto", "always", "never"] + + +class SOAPInput(BaseModel): + """Input containing a medical consultation transcript.""" + transcript: str = Field( + description="The medical consultation transcript to analyze", + ) + + +class SOAPNote(BaseModel): + """Structured SOAP note components.""" + subjective: list[str] = Field( + description="Patient's symptoms, complaints, and history as reported", + examples=["Patient reports severe headache for 3 days", "Denies fever or nausea"], + ) + objective: list[str] = Field( + description="Observable, measurable findings from examination", + examples=["BP 120/80", "Temperature 98.6°F", "No visible inflammation"], + ) + assessment: list[str] = Field( + description="Diagnosis or clinical impressions", + examples=["Tension headache", "Rule out migraine"], + ) + plan: list[str] = Field( + description="Treatment plan and next steps", + examples=["Prescribed ibuprofen 400mg", "Follow up in 2 weeks"], + ) + + +@workflowai.agent( + id="soap-extractor", + model=Model.LLAMA_3_3_70B, +) +async def extract_soap_notes(soap_input: SOAPInput) -> Run[SOAPNote]: + """ + Extract SOAP notes from a medical consultation transcript. + + Guidelines: + 1. Analyze the transcript to identify key medical information + 2. Organize information into SOAP format: + - Subjective: Patient's symptoms, complaints, and history + - Objective: Physical examination findings and test results + - Assessment: Diagnosis or clinical impression + - Plan: Treatment plan and next steps + + 3. Be thorough but concise + 4. Use medical terminology appropriately + 5. Include all relevant information from the transcript + """ + ... + + +class ResultMetrics(TypedDict): + option: str + duration: float + cost: float + + +async def demonstrate_caching(transcript: str): + """Run the same transcript with different caching options and compare results.""" + + print("\nComparing caching options") + print("-" * 50) + + cache_options: list[CacheUsage] = ["auto", "always", "never"] + results: list[ResultMetrics] = [] + + for cache_option in cache_options: + start_time = time.time() + + run = await extract_soap_notes( + SOAPInput(transcript=transcript), + use_cache=cache_option, + ) + + duration = time.time() - start_time + + # Store metrics for comparison + results.append({ + "option": cache_option, + "duration": duration, + "cost": float(run.cost_usd or 0.0), # Convert None to 0.0 + }) + + # Print comparison table + print("\nResults Comparison:") + print("-" * 50) + print(f"{'Cache Option':<12} {'Duration':<10} {'Cost':<8}") + print("-" * 50) + + for r in results: + print( + f"{r['option']:<12} " + f"{r['duration']:.2f}s{'*' if r['duration'] < 0.1 else '':<8} " + f"${r['cost']:<7}", + ) + + print("-" * 50) + print("* Very fast response indicates cached result") + + +async def main(): + # Example medical consultation transcript + transcript = """ + Patient is a 45-year-old female presenting with severe headache for the past 3 days. + She describes the pain as throbbing, primarily on the right side of her head. + Pain level reported as 7/10. Denies fever, nausea, or visual disturbances. + Previous history of migraines, but states this feels different. + + Vital signs stable: BP 120/80, HR 72, Temp 98.6°F. + Physical exam shows mild tenderness in right temporal area. + No neurological deficits noted. + Eye examination normal, no papilledema. + + Assessment suggests tension headache, but need to rule out migraine. + No red flags for secondary causes identified. + + Plan: Prescribed ibuprofen 400mg q6h for pain. + Recommended stress reduction techniques. + Patient education provided regarding headache triggers. + Follow up in 2 weeks, sooner if symptoms worsen. + Return precautions discussed. + """ + + print("\nDemonstrating different caching options") + print("=" * 50) + print("This example shows how caching affects the agent's behavior:") + print("- 'auto': Caches only when temperature is 0 (default)") + print("- 'always': Reuses cached results when available") + print("- 'never': Generates new results every time") + + await demonstrate_caching(transcript) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/04_audio_classifier_agent.py b/examples/04_audio_classifier_agent.py new file mode 100644 index 0000000..8bf5a19 --- /dev/null +++ b/examples/04_audio_classifier_agent.py @@ -0,0 +1,129 @@ +""" +This example demonstrates how to use WorkflowAI to analyze audio files. +Specifically, it shows how to: +1. Pass audio files as input to an agent +2. Analyze the audio content for robocall/spam detection +3. Get a structured classification with confidence score and reasoning +""" + +import asyncio +import base64 +import os + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model, Run +from workflowai.fields import File + + +class AudioInput(BaseModel): + """Input containing the audio file to analyze.""" + audio: File = Field( + description="The audio recording to analyze for spam/robocall detection", + ) + + +class SpamIndicator(BaseModel): + """A specific indicator that suggests the call might be spam.""" + description: str = Field( + description="Description of the spam indicator found in the audio", + examples=[ + "Uses urgency to pressure the listener", + "Mentions winning a prize without entering a contest", + "Automated/robotic voice detected", + ], + ) + quote: str = Field( + description="The exact quote or timestamp where this indicator appears", + examples=[ + "'You must act now before it's too late'", + "'You've been selected as our prize winner'", + "0:05-0:15 - Synthetic voice pattern detected", + ], + ) + + +class AudioClassification(BaseModel): + """Output containing the spam classification results.""" + is_spam: bool = Field( + description="Whether the audio is classified as spam/robocall", + ) + confidence_score: float = Field( + description="Confidence score for the classification (0.0 to 1.0)", + ge=0.0, + le=1.0, + ) + spam_indicators: list[SpamIndicator] = Field( + default_factory=list, + description="List of specific indicators that suggest this is spam", + ) + reasoning: str = Field( + description="Detailed explanation of why this was classified as spam or legitimate", + ) + + +@workflowai.agent( + id="audio-spam-detector", + model=Model.GEMINI_1_5_FLASH_LATEST, +) +async def classify_audio(audio_input: AudioInput) -> Run[AudioClassification]: + """ + Analyze the audio recording to determine if it's a spam/robocall. + + Guidelines: + 1. Listen for common spam/robocall indicators: + - Use of urgency or pressure tactics + - Unsolicited offers or prizes + - Automated/synthetic voices + - Requests for personal/financial information + - Impersonation of legitimate organizations + + 2. Consider both content and delivery: + - What is being said (transcribe key parts) + - How it's being said (tone, pacing, naturalness) + - Background noise and call quality + + 3. Provide clear reasoning: + - Cite specific examples from the audio + - Explain confidence level + - Note any uncertainty + """ + ... + + +async def main(): + # Example: Load an audio file from the assets directory + current_dir = os.path.dirname(os.path.abspath(__file__)) + audio_path = os.path.join(current_dir, "assets", "call.mp3") + + # Verify the file exists + if not os.path.exists(audio_path): + raise FileNotFoundError( + f"Audio file not found at {audio_path}. " + "Please make sure you have the example audio file in the correct location.", + ) + + # Example 1: Using a local file (base64 encoded) + with open(audio_path, "rb") as f: # noqa: ASYNC230 + audio_data = f.read() + + audio = File( + content_type="audio/mp3", + data=base64.b64encode(audio_data).decode(), + ) + + # Example 2: Using a URL instead of base64 (commented out) + # audio = File( + # url="https://example.com/audio/call.mp3" + # ) + + # Classify the audio + run = await classify_audio(AudioInput(audio=audio)) + + # Print results including cost and latency information + print(run) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/05_browser_text_uptime_agent.py b/examples/05_browser_text_uptime_agent.py new file mode 100644 index 0000000..4e47624 --- /dev/null +++ b/examples/05_browser_text_uptime_agent.py @@ -0,0 +1,78 @@ +""" +This example demonstrates how to use the @browser-text tool to fetch and analyze +uptime data from API status pages. It shows how to: +1. Use @browser-text to fetch status page content +2. Extract uptime percentage for specific services +3. Handle different status page formats +""" + +import asyncio + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model, Run + + +class UptimeInput(BaseModel): + """Input for checking API uptime.""" + status_page_url: str = Field( + description="URL of the status page to check", + examples=["https://status.openai.com", "https://status.anthropic.com"], + ) + service_name: str = Field( + description="Name of the specific API service to check", + examples=["API", "Chat Completions", "Embeddings"], + ) + + +class UptimeOutput(BaseModel): + """Output containing uptime percentage.""" + uptime_percentage: float = Field( + description="The uptime percentage for the specified service over the last 30 days", + examples=[99.99, 98.5], + ge=0.0, + le=100.0, + ) + + +@workflowai.agent( + id="uptime-checker", + model=Model.GPT_4O_MINI_LATEST, +) +async def check_uptime(uptime_input: UptimeInput, use_cache: str = "never") -> Run[UptimeOutput]: + """ + Fetch and analyze uptime data from an API status page. + Use @browser-text to get the page content. + + Guidelines: + 1. Visit the provided status page URL + 2. Find the specified service's uptime information + 3. Extract the most recent uptime percentage available + 4. Return just the percentage as a number between 0 and 100 + + Focus on finding the most recent uptime data available. + This could be daily, weekly, monthly or any other time period shown. + """ + ... + + +async def main(): + # Example: Check OpenAI API uptime + uptime_input = UptimeInput( + status_page_url="https://status.openai.com", + service_name="API", + ) + + print(f"\nChecking uptime for {uptime_input.service_name} at {uptime_input.status_page_url}...") + print("-" * 50) + + # Get uptime data with caching disabled + run = await check_uptime(uptime_input, use_cache="never") + + # Print the run + print(run) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/streaming_summary.py b/examples/06_streaming_summary.py similarity index 92% rename from examples/streaming_summary.py rename to examples/06_streaming_summary.py index cd6b4f1..557cb3b 100644 --- a/examples/streaming_summary.py +++ b/examples/06_streaming_summary.py @@ -1,3 +1,10 @@ +""" +This example demonstrates how to use streaming with WorkflowAI agents. It shows how to: +1. Stream outputs as they are generated +2. Get real-time updates during processing +3. Access run metadata like cost and duration +""" + import asyncio from collections.abc import AsyncIterator diff --git a/examples/images/city_identifier.py b/examples/07_image_agent.py similarity index 90% rename from examples/images/city_identifier.py rename to examples/07_image_agent.py index 4f279db..6a324ee 100644 --- a/examples/images/city_identifier.py +++ b/examples/07_image_agent.py @@ -1,3 +1,10 @@ +""" +This example demonstrates how to use images with WorkflowAI agents. It shows how to: +1. Pass image inputs to an agent +2. Analyze city photos for identification +3. Structure detailed visual analysis results +""" + import asyncio import os from typing import Optional @@ -42,7 +49,7 @@ async def identify_city_from_image(_: ImageInput) -> Run[ImageOutput]: ... -async def run_city_identifier(): +async def main(): current_dir = os.path.dirname(os.path.abspath(__file__)) image_path = os.path.join(current_dir, "assets", "new-york-city.jpg") @@ -67,8 +74,7 @@ async def run_city_identifier(): print(f"Cost: ${agent_run.cost_usd:.10f}") print(f"Latency: {agent_run.duration_seconds:.2f}s") - # using URL for Image - # TODO: replace with a Github URL + # Example using URL for Image image_url = "https://t4.ftcdn.net/jpg/02/96/15/35/360_F_296153501_B34baBHDkFXbl5RmzxpiOumF4LHGCvAE.jpg" image = Image(url=image_url) agent_run = await identify_city_from_image( @@ -83,4 +89,4 @@ async def run_city_identifier(): if __name__ == "__main__": load_dotenv(override=True) - asyncio.run(run_city_identifier()) + asyncio.run(main()) diff --git a/examples/pdfs/pdf_answer.py b/examples/08_pdf_agent.py similarity index 100% rename from examples/pdfs/pdf_answer.py rename to examples/08_pdf_agent.py diff --git a/examples/reply/name_extractor.py b/examples/09_reply.py similarity index 67% rename from examples/reply/name_extractor.py rename to examples/09_reply.py index f61e111..f5bdfed 100644 --- a/examples/reply/name_extractor.py +++ b/examples/09_reply.py @@ -1,3 +1,19 @@ +""" +This example demonstrates how to use the reply() method to have a conversation with the agent/LLM. +After getting an initial response, you can use reply() to ask follow-up questions or request +confirmation. The agent/LLM maintains context from the previous interaction, allowing it to: + +1. Confirm its previous output +2. Correct mistakes if needed +3. Provide additional explanation +4. Refine its response based on new information + +Example: + run = await my_agent(input) # Initial response + run = await run.reply(user_message="Are you sure?") # Ask for confirmation + ... +""" + import asyncio from dotenv import load_dotenv @@ -52,7 +68,10 @@ async def main(): print(f"Extracted: {run.output.first_name} {run.output.last_name}") - # Double check with a simple confirmation + # The reply() method allows you to continue the conversation with the LLM + # by sending a follow-up message. The LLM will maintain context from the + # previous interaction and can confirm or revise its previous output. + # Here we ask it to double check its extraction. run = await run.reply(user_message="Are you sure?") print("\nAfter double-checking:") diff --git a/examples/README.md b/examples/README.md index cf71b4e..489b627 100644 --- a/examples/README.md +++ b/examples/README.md @@ -2,31 +2,120 @@ This directory contains example agents demonstrating different capabilities of the WorkflowAI SDK. -## Image Analysis Examples +## Basic Examples -### City Identifier -[city_identifier.py](./images/city_identifier.py) +### 1. Basic Agent +[01_basic_agent.py](./01_basic_agent.py) + +A simple agent that takes a city name and returns information about its country's capital. Demonstrates: +- Basic agent creation with input/output models +- Field descriptions and examples +- Cost and latency tracking + +### 2. Agent with Tools +[02_agent_with_tools.py](./02_agent_with_tools.py) + +Shows how to create agents that can use tools to enhance their capabilities: +- Using hosted tools (@browser-text, @search) +- Creating custom tools +- Handling tool responses + +### 3. Caching Behavior +[03_caching.py](./03_caching.py) + +Demonstrates different caching options in WorkflowAI using a medical SOAP notes extractor: +- 'auto' - Cache only when temperature is 0 (default) +- 'always' - Always use cache if available +- 'never' - Never use cache, always execute new runs + +## Media Analysis Examples + +### 4. Audio Analysis +[04_audio_classifier_agent.py](./04_audio_classifier_agent.py) + +An agent that analyzes audio recordings for spam/robocall detection. Demonstrates: +- Using audio files as input +- Structured classification with confidence scores +- Detailed reasoning about audio content + +Required asset: +- `assets/call.mp3` - Example audio file for spam detection + +### 5. Browser Text Tool +[05_browser_text_uptime_agent.py](./05_browser_text_uptime_agent.py) + +Demonstrates using the `@browser-text` tool to: +- Fetch content from status pages +- Extract uptime percentages +- Handle different page formats +- Process real-time web data + +### 6. Streaming Example +[06_streaming_summary.py](./06_streaming_summary.py) + +Shows how to use streaming to get real-time output from agents: +- Stream translations as they're generated +- Track progress with chunk numbers +- Access run metadata (cost, latency) + +### 7. Image Analysis +[07_image_agent.py](./07_image_agent.py) An agent that identifies cities from images. Given a photo of a city, it: - Identifies the city and country - Explains the reasoning behind the identification -- Lists key landmarks or architectural features visible in the image +- Lists key landmarks or architectural features - Provides confidence level in the identification -Uses the `Image` field type to handle image inputs and Claude 3.5 Sonnet for its strong visual analysis capabilities. +Required asset: +- `assets/new-york-city.jpg` - Example city image for identification -## Document Analysis Examples - -### PDF Question Answering -[pdf_answer.py](./pdf_answer.py) +### 8. PDF Analysis +[08_pdf_agent.py](./08_pdf_agent.py) An agent that answers questions about PDF documents. Given a PDF and a question, it: - Analyzes the PDF content -- Provides a clear and concise answer to the question +- Provides a clear and concise answer - Includes relevant quotes from the document to support its answer -Uses the `File` field type to handle PDF inputs and Claude 3.5 Sonnet for its strong document comprehension abilities. +Required asset: +- `assets/sec-form-4.pdf` - Example SEC form for document analysis ## Workflow Pattern Examples -For examples of different workflow patterns (chains, routing, parallel processing, etc.), see the [workflows](./workflows) directory. \ No newline at end of file +The [workflows](./workflows) directory contains examples of different workflow patterns for complex tasks: + +1. **Chain of Agents** - Process long documents by breaking them into chunks +2. **Routing** - Direct queries to specialized agents based on type +3. **Parallel Processing** - Run multiple analyses concurrently +4. **Orchestrator-Worker** - Coordinate multiple agents for complex tasks + +See [workflows/README.md](./workflows/README.md) for detailed information about each pattern. + +## Asset Dependencies + +All example assets should be placed in the `assets/` directory at the root of the project: + +``` +workflowai-py/ +├── assets/ +│ ├── call.mp3 # For audio classifier +│ ├── new-york-city.jpg # For image agent +│ └── sec-form-4.pdf # For PDF agent +├── examples/ +│ ├── 01_basic_agent.py +│ ├── 02_agent_with_tools.py +│ └── ... +``` + +## Running the Examples + +1. Make sure you have the required assets in the `assets/` directory +2. Set up your environment variables in `.env`: + ``` + WORKFLOWAI_API_KEY=your_api_key + ``` +3. Run any example using Python: + ```bash + python examples/01_basic_agent.py + ``` \ No newline at end of file diff --git a/examples/assets/call.mp3 b/examples/assets/call.mp3 new file mode 100644 index 0000000..2093286 Binary files /dev/null and b/examples/assets/call.mp3 differ diff --git a/examples/images/assets/new-york-city.jpg b/examples/assets/new-york-city.jpg similarity index 100% rename from examples/images/assets/new-york-city.jpg rename to examples/assets/new-york-city.jpg diff --git a/examples/images/assets/paris.jpg b/examples/assets/paris.jpg similarity index 100% rename from examples/images/assets/paris.jpg rename to examples/assets/paris.jpg diff --git a/examples/pdfs/assets/sec-form-4.pdf b/examples/assets/sec-form-4.pdf similarity index 100% rename from examples/pdfs/assets/sec-form-4.pdf rename to examples/assets/sec-form-4.pdf diff --git a/examples/workflows/chain_of_agents.py b/examples/workflows/chain_of_agents.py index 6bdd169..56aec25 100644 --- a/examples/workflows/chain_of_agents.py +++ b/examples/workflows/chain_of_agents.py @@ -183,20 +183,20 @@ async def main(): query = "What were the major environmental impacts of the Industrial Revolution?" - result = await process_long_document(document, query) + run = await process_long_document(document, query) print("\n=== Query ===") print(query) print("\n=== Answer ===") - print(result.answer) + print(run.answer) print("\n=== Reasoning ===") - print(result.reasoning) + print(run.reasoning) - if result.supporting_evidence: + if run.supporting_evidence: print("\n=== Supporting Evidence ===") - for evidence in result.supporting_evidence: + for evidence in run.supporting_evidence: print(f"- {evidence}") diff --git a/tests/e2e/readme_examples_test.py b/tests/e2e/readme_examples_test.py new file mode 100644 index 0000000..b560e15 --- /dev/null +++ b/tests/e2e/readme_examples_test.py @@ -0,0 +1,184 @@ +"""Tests to verify that the code examples in the README.md work as expected.""" + +import base64 +from collections.abc import AsyncIterator +from datetime import date +from pathlib import Path + +import pytest +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model, Run +from workflowai.fields import File + + +# Input model for the call feedback analysis +class CallFeedbackInput(BaseModel): + """Input for analyzing a customer feedback call.""" + transcript: str = Field(description="The full transcript of the customer feedback call.") + call_date: date = Field(description="The date when the call took place.") + + +# Model representing a single feedback point with supporting evidence +class FeedbackPoint(BaseModel): + """A specific feedback point with its supporting quote.""" + point: str = Field(description="The main point or insight from the feedback.") + quote: str = Field(description="The exact quote from the transcript supporting this point.") + timestamp: str = Field(description="The timestamp or context of when this was mentioned in the call.") + + +# Model representing the structured analysis of the customer feedback call +class CallFeedbackOutput(BaseModel): + """Structured analysis of the customer feedback call.""" + positive_points: list[FeedbackPoint] = Field( + default_factory=list, + description="List of positive feedback points, each with a supporting quote.", + ) + negative_points: list[FeedbackPoint] = Field( + default_factory=list, + description="List of negative feedback points, each with a supporting quote.", + ) + + +@workflowai.agent(id="analyze-call-feedback", model=Model.GPT_4O_MINI_LATEST) +async def analyze_call_feedback(feedback_input: CallFeedbackInput) -> Run[CallFeedbackOutput]: + """ + Analyze a customer feedback call transcript to extract key insights: + 1. Identify positive feedback points with supporting quotes + 2. Identify negative feedback points with supporting quotes + 3. Include timestamp/context for each point + + Be specific and objective in the analysis. Use exact quotes from the transcript. + Maintain the customer's original wording in quotes. + """ + ... + + +@pytest.mark.asyncio +async def test_call_feedback_analysis(): + """Test the call feedback analysis example from the README.""" + # Example transcript from README + transcript = """ + [00:01:15] Customer: I've been using your software for about 3 months now, and I have to say\ + the new dashboard feature is really impressive. It's saving me at least an hour each day on reporting. + + [00:02:30] Customer: However, I'm really frustrated with the export functionality. It crashed twice\ + this week when I tried to export large reports, and I lost all my work. + + [00:03:45] Customer: On a positive note, your support team, especially Sarah, was very responsive\ + when I reported the issue. She got back to me within minutes. + + [00:04:30] Customer: But I think the pricing for additional users is a bit steep compared to other\ + solutions we looked at. + """ + + # Create input + feedback_input = CallFeedbackInput( + transcript=transcript, + call_date=date(2024, 1, 15), + ) + + # Analyze the feedback + run = await analyze_call_feedback(feedback_input) + + # Verify the run object has expected attributes + assert run.output is not None + assert run.model is not None + assert run.cost_usd is not None + assert run.duration_seconds is not None + + # Verify we got both positive and negative points + assert len(run.output.positive_points) > 0 + assert len(run.output.negative_points) > 0 + + # Verify each point has the required fields + for point in run.output.positive_points + run.output.negative_points: + assert point.point + assert point.quote + assert point.timestamp + + +@workflowai.agent(id="analyze-call-feedback-stream") +def analyze_call_feedback_stream(feedback_input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]: + """Same as analyze_call_feedback but with streaming enabled.""" + ... + + +@pytest.mark.asyncio +async def test_streaming_example(): + """Test the streaming example from the README.""" + feedback_input = CallFeedbackInput( + transcript="[00:01:15] Customer: The product is great!", + call_date=date(2024, 1, 15), + ) + + last_chunk = None + chunks_received = 0 + + async for chunk in analyze_call_feedback_stream(feedback_input): + # Verify each chunk has an output + assert chunk.output is not None + chunks_received += 1 + last_chunk = chunk + + # Verify we received at least one chunk + assert chunks_received > 0 + + # Verify the last chunk has cost and duration + assert last_chunk is not None + assert last_chunk.cost_usd is not None + assert last_chunk.duration_seconds is not None + + +# Models for the audio example from README +class AudioInput(BaseModel): + """Input containing the audio file to analyze.""" + audio: File = Field(description="The audio recording to analyze for spam/robocall detection") + + +class AudioClassification(BaseModel): + """Output containing the spam classification results.""" + is_spam: bool = Field(description="Whether the audio is classified as spam/robocall") + + +@workflowai.agent(id="audio-classifier", model=Model.GEMINI_1_5_FLASH_LATEST) +async def classify_audio(audio_input: AudioInput) -> Run[AudioClassification]: + """ + Analyze the audio recording to determine if it's a spam/robocall. + """ + ... + + +@pytest.mark.asyncio +async def test_audio_classification(): + """Test the audio classification example from the README.""" + # Load the example audio file + audio_path = Path("examples/assets/call.mp3") + + # Skip test if audio file doesn't exist + if not audio_path.exists(): + pytest.skip(f"Audio file not found at {audio_path}") + + # Note: Using sync file read in test code is acceptable. + # We avoid adding aiofiles dependency just for this test case. + with open(audio_path, "rb") as f: # noqa: ASYNC230 + audio_data = f.read() + + # Create audio input (base64 encoded) + audio = File( + content_type="audio/mp3", + data=base64.b64encode(audio_data).decode(), + ) + + # Run the classification + run = await classify_audio(AudioInput(audio=audio)) + + # Verify the run object has expected attributes + assert run.output is not None + assert run.model is not None + assert run.cost_usd is not None + assert run.duration_seconds is not None + + # Verify we got a classification + assert isinstance(run.output.is_spam, bool) diff --git a/workflowai/core/domain/run.py b/workflowai/core/domain/run.py index c2e895d..3b2281a 100644 --- a/workflowai/core/domain/run.py +++ b/workflowai/core/domain/run.py @@ -86,6 +86,44 @@ def model(self): return None return self.version.properties.model + def format_output(self) -> str: + """Format the run output as a string. + + Returns a formatted string containing: + 1. The output as a nicely formatted JSON object + 2. The cost with $ prefix (if available) + 3. The latency with 2 decimal places and 's' suffix (if available) + + Example: + Output: + ================================================== + { + "message": "hello" + } + ================================================== + Cost: $ 0.001 + Latency: 1.23s + """ + # Format the output string + output = [ + "\nOutput:", + "=" * 50, + self.output.model_dump_json(indent=2), + "=" * 50, + ] + + # Add run information if available + if self.cost_usd is not None: + output.append(f"Cost: $ {self.cost_usd}") + if self.duration_seconds is not None: + output.append(f"Latency: {self.duration_seconds:.2f}s") + + return "\n".join(output) + + def __str__(self) -> str: + """Return a string representation of the run.""" + return self.format_output() + class _AgentBase(Protocol, Generic[AgentOutput]): async def reply( diff --git a/workflowai/core/domain/run_test.py b/workflowai/core/domain/run_test.py index 31aa347..34c65ce 100644 --- a/workflowai/core/domain/run_test.py +++ b/workflowai/core/domain/run_test.py @@ -45,3 +45,50 @@ def test_different_agents(self, run1: Run[_TestOutput], run2: Run[_TestOutput]): run2._agent = Mock() # pyright: ignore [reportPrivateUsage] assert run1._agent != run2._agent, "sanity check" # pyright: ignore [reportPrivateUsage] assert run1 == run2 + + +# Test that format_output correctly formats: +# 1. The output as a JSON object +# 2. The cost with $ prefix and correct precision +# 3. The latency with 2 decimal places and 's' suffix +def test_format_output() -> None: + run = Run[_TestOutput]( + id="test-id", + agent_id="agent-1", + schema_id=1, + output=_TestOutput(message="hello"), + duration_seconds=1.23, + cost_usd=0.001, + ) + + expected = """\nOutput: +================================================== +{ + "message": "hello" +} +================================================== +Cost: $ 0.001 +Latency: 1.23s""" + + assert run.format_output() == expected + + +# Test that format_output works correctly when cost and latency are not provided: +# 1. The output is still formatted as a JSON object +# 2. No cost or latency lines are included in the output +def test_format_output_no_cost_latency() -> None: + run = Run[_TestOutput]( + id="test-id", + agent_id="agent-1", + schema_id=1, + output=_TestOutput(message="hello"), + ) + + expected = """\nOutput: +================================================== +{ + "message": "hello" +} +==================================================""" + + assert run.format_output() == expected