Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ for point in run.negative_points:

WorkflowAI supports a long list of models. The source of truth for models we support is on [workflowai.com](https://workflowai.com). The [Model enum](./workflowai/core/domain/model.py) is a good indication of what models are supported at the time of the sdk release, although it may be missing some models since new ones are added all the time.

You can set the model explicitly in the agent decorator:
You can specify the model in two ways:

1. In the agent decorator:

```python
from workflowai import Model
Expand All @@ -194,6 +196,19 @@ async def analyze_call_feedback(input: CallFeedbackInput) -> CallFeedbackOutput:
...
```

2. As a function parameter when calling the agent:

```python
@workflowai.agent(id="analyze-call-feedback")
async def analyze_call_feedback(input: CallFeedbackInput) -> CallFeedbackOutput:
...

# Call with specific model
result = await analyze_call_feedback(input_data, model=Model.GPT_4O_LATEST)
```

This flexibility allows you to either fix the model in the agent definition or dynamically choose different models at runtime.

> Models do not become invalid on WorkflowAI. When a model is retired, it will be replaced dynamically by
> a newer version of the same model with the same or a lower price so calling the api with
> a retired model will always work.
Expand Down
162 changes: 162 additions & 0 deletions examples/15_text_to_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
This example demonstrates how to convert natural language questions to SQL queries.
It uses a sample e-commerce database schema and shows how to generate safe and efficient SQL queries.

Like example 14 (templated instructions), this example shows how to use variables in the agent's
instructions. The template variables ({{ db_schema }} and {{ question }}) are automatically populated
from the input model's fields, allowing the instructions to adapt based on the input.

The example includes:
1. Simple SELECT query with conditions
2. JOIN query with aggregation
3. Complex query with multiple JOINs, grouping, and ordering
"""

import asyncio

from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run


class SQLGenerationInput(BaseModel):
"""Input model for the SQL generation agent."""

db_schema: str = Field(
description="The complete SQL schema with CREATE TABLE statements",
)
question: str = Field(
description="The natural language question to convert to SQL",
)


class SQLGenerationOutput(BaseModel):
"""Output model containing the generated SQL query and explanation."""

sql_query: str = Field(
description="The generated SQL query",
)
explanation: str = Field(
description="Explanation of what the query does and why certain choices were made",
)
tables_used: list[str] = Field(
description="List of tables referenced in the query",
)


@workflowai.agent(
id="text-to-sql",
model=Model.CLAUDE_3_5_SONNET_LATEST,
)
async def generate_sql(review_input: SQLGenerationInput) -> Run[SQLGenerationOutput]:
"""
Convert natural language questions to SQL queries based on the provided schema.

You are a SQL expert that converts natural language questions into safe and efficient SQL queries.
The queries should be compatible with standard SQL databases.

Important guidelines:
1. NEVER trust user input directly in queries to prevent SQL injection
2. Use proper quoting and escaping for string values
3. Use meaningful table aliases for better readability
4. Format queries with proper indentation and line breaks
5. Use explicit JOIN conditions (no implicit joins)
6. Include column names in GROUP BY rather than positions

Schema:
{{ db_schema }}

Question to convert to SQL:
{{ question }}

Please provide:
1. A safe and efficient SQL query
2. An explanation of the query and any important considerations
3. List of tables used in the query
"""
...


async def main():
# Example schema for an e-commerce database
schema = """
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category TEXT NOT NULL,
stock_quantity INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(id)
);

CREATE TABLE order_items (
id INTEGER PRIMARY KEY,
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
"""

# Example 1: Simple SELECT with conditions
print("\nExample 1: Find expensive products")
print("-" * 50)
run = await generate_sql(
SQLGenerationInput(
db_schema=schema,
question="Show me all products that cost more than $100, ordered by price descending",
),
)
print(run)

# Example 2: JOIN with aggregation
print("\nExample 2: Customer order summary")
print("-" * 50)
run = await generate_sql(
SQLGenerationInput(
db_schema=schema,
question=(
"List all customers with their total number of orders and total spend, "
"only showing customers who have made at least 2 orders"
),
),
)
print(run)

# Example 3: Complex query
print("\nExample 3: Product category analysis")
print("-" * 50)
run = await generate_sql(
SQLGenerationInput(
db_schema=schema,
question=(
"What are the top 3 product categories by revenue in the last 30 days, "
"including the number of unique customers who bought from each category?"
),
),
)
print(run)


if __name__ == "__main__":
asyncio.run(main())
118 changes: 118 additions & 0 deletions examples/16_multi_model_consensus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
This example demonstrates how to ask the same question to multiple different LLMs
and then combine their responses into a single coherent answer using another LLM.

The example uses three different models for answering:
- GPT-4O Mini
- Gemini 2.0 Flash
- Llama 3.3 70B

Then uses O3 Mini (with medium reasoning effort) to analyze and combine their responses.
"""

import asyncio

from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run


class MultiModelInput(BaseModel):
"""Input model containing the question to ask all models."""

question: str = Field(
description="The question to ask all models",
)
model_name: str = Field(
description="Name of the model providing the response",
)


class ModelResponse(BaseModel):
"""Response from an individual model."""

model_name: str = Field(description="Name of the model that provided this response")
response: str = Field(description="The model's response to the question")


class CombinerInput(BaseModel):
"""Input for the response combiner."""

responses: list[ModelResponse] = Field(description="List of responses to combine")


class CombinedOutput(BaseModel):
"""Final output combining responses from all models."""

combined_answer: str = Field(
description="Synthesized answer combining insights from all models",
)
explanation: str = Field(
description="Explanation of how the responses were combined and why",
)


@workflowai.agent(
id="question-answerer",
)
async def get_model_response(query: MultiModelInput) -> Run[ModelResponse]:
"""Get response from the specified model."""
...


@workflowai.agent(
id="response-combiner",
model=Model.O3_MINI_2025_01_31_MEDIUM_REASONING_EFFORT,
)
async def combine_responses(responses_input: CombinerInput) -> Run[CombinedOutput]:
"""
Analyze and combine responses from multiple models into a single coherent answer.

You are an expert at analyzing and synthesizing information from multiple sources.
Your task is to:
1. Review the responses from different models
2. Identify key insights and unique perspectives from each
3. Create a comprehensive answer that combines the best elements
4. Explain your synthesis process

Please ensure the combined answer is:
- Accurate and well-reasoned
- Incorporates unique insights from each model
- Clear and coherent
- Properly attributed when using specific insights
"""
...


async def main():
# Example: Scientific explanation
print("\nExample: Scientific Concept")
print("-" * 50)
question = "What is dark matter and why is it important for our understanding of the universe?"

# Get responses from all models
models = [
(Model.GPT_4O_MINI_LATEST, "GPT-4O Mini"),
(Model.GEMINI_2_0_FLASH_LATEST, "Gemini 2.0 Flash"),
(Model.LLAMA_3_3_70B, "Llama 3.3 70B"),
]

responses = []
for model, model_name in models:
run = await get_model_response(
MultiModelInput(
question=question,
model_name=model_name,
),
model=model,
)
responses.append(run.output)

# Combine responses
combined = await combine_responses(CombinerInput(responses=responses))
print(combined)


if __name__ == "__main__":
asyncio.run(main())
Loading