Skip to content
9 changes: 8 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ poetry run pytest tests/e2e

#### Configuring VSCode

Suggested extensions are available in the [.vscode/extensions.json](.vscode/extensions.json) file.
Suggested extensions are available in the [.vscode/extensions.json](.vscode/extensions.json) file. When you open this project in VSCode or Cursor, you'll be prompted to install these recommended extensions automatically.

To manually install recommended extensions:
1. Open VSCode/Cursor Command Palette (Cmd/Ctrl + Shift + P)
2. Type "Show Recommended Extensions"
3. Install the ones marked with @recommended

These extensions will help ensure consistent code quality and style across all contributors.

### Dependencies

Expand Down
49 changes: 37 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ client = WorkflowAI(

# Use the client to create and run agents
@client.agent()
def my_agent(task_input: Input) -> Output:
def my_agent(agent_input: Input) -> Output:
...
```

Expand Down Expand Up @@ -154,16 +154,16 @@ feedback_input = CallFeedbackInput(
)

# Analyze the feedback
result = await analyze_call_feedback(feedback_input)
run = await analyze_call_feedback(feedback_input)

# Print the analysis
print("\nPositive Points:")
for point in result.positive_points:
for point in run.positive_points:
print(f"\n• {point.point}")
print(f" Quote [{point.timestamp}]: \"{point.quote}\"")

print("\nNegative Points:")
for point in result.negative_points:
for point in run.negative_points:
print(f"\n• {point.point}")
print(f" Quote [{point.timestamp}]: \"{point.quote}\"")
```
Expand Down Expand Up @@ -354,7 +354,33 @@ An example of using a PDF as input is available in [pdf_answer.py](./examples/pd

### Audio

[todo]
Use the `File` class to pass audio files as input to an agent. Note that only some models support audio input.

```python
from workflowai.fields import File
...

class AudioInput(BaseModel):
audio: File = Field(description="The audio recording to analyze for spam/robocall detection")

class AudioClassification(BaseModel):
is_spam: bool = Field(description="Whether the audio is classified as spam/robocall")

@workflowai.agent(id="audio-classifier", model=Model.GEMINI_1_5_FLASH_LATEST)
async def classify_audio(input: AudioInput) -> AudioClassification:
...

# Example 1: Using base64 encoded data
audio = File(content_type='audio/mp3', data='<base 64 encoded data>')

# Example 2: Using a URL
# audio = File(url='https://example.com/audio/call.mp3')

run = await classify_audio(AudioInput(audio=audio))
print(run)
```

See an example of audio classification in [audio_classifier.py](./examples/04_audio_classifier.py).

### Caching

Expand Down Expand Up @@ -590,7 +616,7 @@ async def analyze_call_feedback_strict(input: CallFeedbackInput) -> CallFeedback
...

try:
result = await analyze_call_feedback_strict(
run = await analyze_call_feedback_strict(
CallFeedbackInput(
transcript="[00:01:15] Customer: The product is great!",
call_date=date(2024, 1, 15)
Expand All @@ -608,13 +634,13 @@ async def analyze_call_feedback_tolerant(input: CallFeedbackInput) -> CallFeedba
...

# The invalid_generation is less likely
result = await analyze_call_feedback_tolerant(
run = await analyze_call_feedback_tolerant(
CallFeedbackInput(
transcript="[00:01:15] Customer: The product is great!",
call_date=date(2024, 1, 15)
)
)
if not result.positive_points and not result.negative_points:
if not run.positive_points and not run.negative_points:
print("No feedback points were generated!")
```

Expand All @@ -630,15 +656,14 @@ absent will cause `AttributeError` when queried.
async def analyze_call_feedback_stream(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
...

async for result in analyze_call_feedback_stream(
async for run in analyze_call_feedback_stream(
CallFeedbackInput(
transcript="[00:01:15] Customer: The product is great!",
call_date=date(2024, 1, 15)
)
):
# With default values, we can safely check the points as they stream in
print(f"Positive points so far: {len(result.positive_points)}")
print(f"Negative points so far: {len(result.negative_points)}")
print(f"Positive points so far: {len(run.positive_points)}")
print(f"Negative points so far: {len(run.negative_points)}")
```

#### Field properties
Expand Down
75 changes: 75 additions & 0 deletions examples/01_basic_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
This example demonstrates how to create a basic WorkflowAI agent that takes a city name
and returns information about the capital of its country. It showcases:

1. Basic agent creation with input/output models
2. Field descriptions and examples
3. Cost and latency tracking
"""

import asyncio

from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run


class CityInput(BaseModel):
"""Input model for the city-to-capital agent."""
city: str = Field(
description="The name of the city for which to find the country's capital",
examples=["Paris", "New York", "Tokyo"],
)


class CapitalOutput(BaseModel):
"""Output model containing information about the capital city."""
country: str = Field(
description="The country where the input city is located",
examples=["France", "United States", "Japan"],
)
capital: str = Field(
description="The capital city of the country",
examples=["Paris", "Washington D.C.", "Tokyo"],
)
fun_fact: str = Field(
description="An interesting fact about the capital city",
examples=["Paris has been the capital of France since 508 CE"],
)


@workflowai.agent(
id="city-to-capital",
model=Model.CLAUDE_3_5_SONNET_LATEST,
)
async def get_capital_info(city_input: CityInput) -> Run[CapitalOutput]:
"""
Find the capital city of the country where the input city is located.

Guidelines:
1. First identify the country where the input city is located
2. Then provide the capital city of that country
3. Include an interesting historical or cultural fact about the capital
4. Be accurate and precise with geographical information
5. If the input city is itself the capital, still provide the information
"""
...


async def main():
# Example 1: Basic usage with Paris
print("\nExample 1: Basic usage with Paris")
print("-" * 50)
run = await get_capital_info(CityInput(city="Paris"))
print(run)

# Example 2: Using Tokyo
print("\nExample 2: Using Tokyo")
print("-" * 50)
run = await get_capital_info(CityInput(city="Tokyo"))
print(run)


if __name__ == "__main__":
asyncio.run(main())
159 changes: 159 additions & 0 deletions examples/03_caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""
This example demonstrates the different caching options in WorkflowAI:
1. 'auto' - Cache only when temperature is 0 (default)
2. 'always' - Always use cache if available
3. 'never' - Never use cache, always execute new runs

The example uses a medical SOAP notes extractor to show how caching affects:
- Response consistency (important for medical documentation)
- Cost efficiency
- Execution time
"""

import asyncio
import time
from typing import Literal, TypedDict

from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run

# Import CacheUsage type
CacheUsage = Literal["auto", "always", "never"]


class SOAPInput(BaseModel):
"""Input containing a medical consultation transcript."""
transcript: str = Field(
description="The medical consultation transcript to analyze",
)


class SOAPNote(BaseModel):
"""Structured SOAP note components."""
subjective: list[str] = Field(
description="Patient's symptoms, complaints, and history as reported",
examples=["Patient reports severe headache for 3 days", "Denies fever or nausea"],
)
objective: list[str] = Field(
description="Observable, measurable findings from examination",
examples=["BP 120/80", "Temperature 98.6°F", "No visible inflammation"],
)
assessment: list[str] = Field(
description="Diagnosis or clinical impressions",
examples=["Tension headache", "Rule out migraine"],
)
plan: list[str] = Field(
description="Treatment plan and next steps",
examples=["Prescribed ibuprofen 400mg", "Follow up in 2 weeks"],
)


@workflowai.agent(
id="soap-extractor",
model=Model.LLAMA_3_3_70B,
)
async def extract_soap_notes(soap_input: SOAPInput) -> Run[SOAPNote]:
"""
Extract SOAP notes from a medical consultation transcript.

Guidelines:
1. Analyze the transcript to identify key medical information
2. Organize information into SOAP format:
- Subjective: Patient's symptoms, complaints, and history
- Objective: Physical examination findings and test results
- Assessment: Diagnosis or clinical impression
- Plan: Treatment plan and next steps

3. Be thorough but concise
4. Use medical terminology appropriately
5. Include all relevant information from the transcript
"""
...


class ResultMetrics(TypedDict):
option: str
duration: float
cost: float


async def demonstrate_caching(transcript: str):
"""Run the same transcript with different caching options and compare results."""

print("\nComparing caching options")
print("-" * 50)

cache_options: list[CacheUsage] = ["auto", "always", "never"]
results: list[ResultMetrics] = []

for cache_option in cache_options:
start_time = time.time()

run = await extract_soap_notes(
SOAPInput(transcript=transcript),
use_cache=cache_option,
)

duration = time.time() - start_time

# Store metrics for comparison
results.append({
"option": cache_option,
"duration": duration,
"cost": float(run.cost_usd or 0.0), # Convert None to 0.0
})

# Print comparison table
print("\nResults Comparison:")
print("-" * 50)
print(f"{'Cache Option':<12} {'Duration':<10} {'Cost':<8}")
print("-" * 50)

for r in results:
print(
f"{r['option']:<12} "
f"{r['duration']:.2f}s{'*' if r['duration'] < 0.1 else '':<8} "
f"${r['cost']:<7}",
)

print("-" * 50)
print("* Very fast response indicates cached result")


async def main():
# Example medical consultation transcript
transcript = """
Patient is a 45-year-old female presenting with severe headache for the past 3 days.
She describes the pain as throbbing, primarily on the right side of her head.
Pain level reported as 7/10. Denies fever, nausea, or visual disturbances.
Previous history of migraines, but states this feels different.

Vital signs stable: BP 120/80, HR 72, Temp 98.6°F.
Physical exam shows mild tenderness in right temporal area.
No neurological deficits noted.
Eye examination normal, no papilledema.

Assessment suggests tension headache, but need to rule out migraine.
No red flags for secondary causes identified.

Plan: Prescribed ibuprofen 400mg q6h for pain.
Recommended stress reduction techniques.
Patient education provided regarding headache triggers.
Follow up in 2 weeks, sooner if symptoms worsen.
Return precautions discussed.
"""

print("\nDemonstrating different caching options")
print("=" * 50)
print("This example shows how caching affects the agent's behavior:")
print("- 'auto': Caches only when temperature is 0 (default)")
print("- 'always': Reuses cached results when available")
print("- 'never': Generates new results every time")

await demonstrate_caching(transcript)


if __name__ == "__main__":
asyncio.run(main())
Loading