diff --git a/README.md b/README.md index 721df18a3..d085e876a 100644 --- a/README.md +++ b/README.md @@ -34,22 +34,26 @@ ART's **LangGraph integration** enables you to train sophisticated ReAct-style a ```python import art -from art.langgraph import wrap_rollout, init_chat_model -from langgraph import create_react_agent - -# Your existing tools -tools = [search_inbox, read_email, return_final_answer] - -@wrap_rollout(model) -async def run_agent(scenario: str) -> art.Trajectory: - # Create LangGraph agent with ART's LLM wrapper - agent = create_react_agent(init_chat_model(), tools) - - result = await agent.ainvoke({"messages": [("user", scenario)]}) - return art.Trajectory() # Automatically captured - -# Train with RULER - no reward engineering needed! -await art.train(model, reward_function="ruler") +from art.langgraph import init_chat_model +from langgraph.prebuilt import create_react_agent + +async def email_rollout(model: art.Model, scenario: str) -> art.Trajectory: + traj = art.Trajectory(reward=0.0, messages_and_choices=[]) + + # Create LangGraph agent with ART's chat model + chat_model = init_chat_model(model.name) + agent = create_react_agent(chat_model, tools) + + await agent.ainvoke({"messages": [("user", scenario)]}) + traj.reward = 1.0 # Score based on results + return traj + +# Train your agent +scenarios = ["Find urgent emails", "Search Q4 budget"] +groups = await art.gather_trajectory_groups( + (art.TrajectoryGroup(email_rollout(model, s) for _ in range(4)) for s in scenarios) +) +await model.train(groups) ``` [📖 Learn more about LangGraph integration →](https://art.openpipe.ai/integrations/langgraph-integration) | [🏋️ Try the notebook →](https://colab.research.google.com/github/openpipe/art-notebooks/blob/main/examples/langgraph/art-e-langgraph.ipynb) diff --git a/docs/integrations/langgraph-integration.mdx b/docs/integrations/langgraph-integration.mdx index 30456e71e..4dad672f8 100644 --- a/docs/integrations/langgraph-integration.mdx +++ b/docs/integrations/langgraph-integration.mdx @@ -7,6 +7,16 @@ description: "Build and train sophisticated AI agents using LangGraph with ART's ART's LangGraph integration enables you to build sophisticated, multi-step AI agents that learn and improve through reinforcement training. By combining LangGraph's powerful agent framework with ART's training capabilities, you can create agents that reason, use tools, and adapt their behavior over time. +## Installation + +To use ART with LangGraph, install ART with the required extras: + +```bash +uv pip install -U openpipe-art[backend,langgraph]>=0.4.9 +``` + +The `langgraph` extra includes the LangGraph integration dependencies, while `backend` provides the training backend components. If running using the [SkyPilotBackend](/fundamentals/art-backend#skypilotbackend), substitute `skypilot` for `backend` in the extras array. + ## Why Use ART with LangGraph? LangGraph provides an excellent framework for building various types of agents - from ReAct-style reasoning agents to complex multi-agent workflows with supervisor patterns and parallel execution. However, getting these agents to perform optimally often requires extensive prompt engineering and manual tuning. ART's integration with LangGraph addresses this by: @@ -25,62 +35,132 @@ LangGraph provides an excellent framework for building various types of agents - ## Basic Usage -Here's how to integrate ART with your existing LangGraph agent: +Here's how to integrate ART with your LangGraph agent: ```python +import uuid +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_core.tools import tool +from langgraph.prebuilt import create_react_agent +from pydantic import BaseModel +from art.langgraph import init_chat_model import art -from art.langgraph import wrap_rollout, init_chat_model -from art.local import LocalBackend -from langgraph import create_react_agent -# Define your tools -def search_inbox(query: str) -> str: - """Search for emails matching the query.""" - # Your search implementation - return f"Found emails matching: {query}" +# Custom trajectory class to store additional data +class EmailTrajectory(art.Trajectory): + final_answer: str | None = None + found_emails: list[str] = [] + +# Scenario data class +class EmailScenario(BaseModel): + step: int + query: str + user_email: str + +# Rollout function that integrates with LangGraph +async def email_rollout(model: art.Model, scenario: EmailScenario) -> EmailTrajectory: + traj = EmailTrajectory( + reward=0.0, + messages_and_choices=[], + metadata={ + "step": scenario.step, + "query": scenario.query, + }, + ) + + # Define tools inside the rollout to access local variables + @tool + def search_emails(keywords: list[str]) -> list[dict]: + """Search emails by keywords.""" + # Your email search implementation here + results = [ + {"id": "email_1", "subject": f"Found email about {keywords[0]}", "from": "john@company.com"}, + {"id": "email_2", "subject": f"Another email about {keywords[0]}", "from": "sarah@company.com"}, + ] + traj.found_emails.extend([r["id"] for r in results]) + return results + + @tool + def read_email(email_id: str) -> dict: + """Read a specific email.""" + # Your email reading implementation here + return { + "id": email_id, + "content": f"Email content for {email_id}", + "timestamp": "2024-01-15" + } + + @tool + def provide_answer(answer: str) -> str: + """Provide the final answer.""" + traj.final_answer = answer + # Score the trajectory based on quality (simple example) + traj.reward = 1.0 if len(answer) > 10 else 0.0 + return f"Final answer: {answer}" + + # Create LangGraph tools and agent + tools = [search_emails, read_email, provide_answer] + chat_model = init_chat_model(model.name, temperature=0.7) + agent = create_react_agent(chat_model, tools) + + # Run the agent + try: + config = { + "configurable": {"thread_id": str(uuid.uuid4())}, + "recursion_limit": 20, + } + + await agent.ainvoke( + { + "messages": [ + SystemMessage(content=f"You are an email assistant for {scenario.user_email}. Help find and read emails."), + HumanMessage(content=scenario.query), + ] + }, + config=config, + ) -def read_email(email_id: str) -> str: - """Read a specific email by ID.""" - # Your email reading implementation - return f"Email content for {email_id}" + except Exception as e: + print(f"Error in agent execution: {e}") + traj.reward = 0.0 -tools = [search_inbox, read_email] + return traj async def train_email_agent(): - with LocalBackend() as backend: - # Create your trainable model - model = art.TrainableModel( - name="email-agent-langgraph", - project="email-search-agent", - base_model="Qwen/Qwen2.5-7B-Instruct", - ) + from art.local import LocalBackend - await backend.register_model(model) + backend = LocalBackend() - # Define your rollout function - @wrap_rollout(model) - async def run_agent(scenario: str) -> art.Trajectory: - # Create the LangGraph agent with ART's LLM wrapper - agent = create_react_agent(init_chat_model(), tools) + # Create your trainable model + model = art.TrainableModel( + name="email-agent-langgraph", + project="email-search-agent", + base_model="Qwen/Qwen2.5-7B-Instruct", + ) - # Run the agent - result = await agent.ainvoke({"messages": [("user", scenario)]}) - - # Return trajectory (automatically captured by wrap_rollout) - return art.Trajectory() - - # Generate training data - scenarios = [ - "Find emails from John about the quarterly report", - "Search for emails containing budget discussions from last week", - "Find the latest email from Sarah and summarize it", - ] + await model.register(backend) - for scenario in scenarios: - await run_agent(scenario) + # Create training scenarios + scenarios = [ + EmailScenario(step=0, query="Find emails from John about quarterly reports", user_email="user@company.com"), + EmailScenario(step=0, query="Search for budget discussions from last week", user_email="user@company.com"), + EmailScenario(step=0, query="Find the latest email from Sarah", user_email="user@company.com"), + ] - # Start training with RULER - await art.train(model, reward_function="ruler") + # Generate trajectory groups + trajectory_groups = await art.gather_trajectory_groups( + ( + art.TrajectoryGroup( + email_rollout(model, scenario) + for _ in range(4) # Multiple trajectories per scenario + ) + for scenario in scenarios + ), + pbar_desc="gather" + ) + + # Train the model + await model.train(trajectory_groups, config=art.TrainConfig(learning_rate=1e-5)) if __name__ == "__main__": import asyncio @@ -89,122 +169,303 @@ if __name__ == "__main__": ## How It Works -The ART-LangGraph integration works through two main components: +The ART-LangGraph integration works through these key components: ### 1. LLM Wrapper (`init_chat_model`) -Replaces LangGraph's standard LLM initialization with ART's logging-enabled wrapper: +ART's `init_chat_model` automatically captures all LLM interactions for training: ```python -# Standard LangGraph -from langchain_openai import ChatOpenAI -llm = ChatOpenAI(model="gpt-4") - -# With ART integration from art.langgraph import init_chat_model -llm = init_chat_model() # Automatically uses your model's inference settings + +# Create a chat model that logs to ART +chat_model = init_chat_model(model.name, temperature=0.7) + +# Use with LangGraph as normal +agent = create_react_agent(chat_model, tools) ``` -The wrapper captures all LLM interactions, including: +**What gets captured:** -- Input messages and prompts -- Generated responses and tool calls -- Tool execution results -- Multi-step reasoning chains +- All chat completions and their responses +- Tool calls and tool execution results +- Token usage and timing information +- Error states and exceptions -### 2. Rollout Wrapper (`wrap_rollout`) +### 2. Custom Trajectory Classes -Automatically converts your agent execution into ART trajectories: +Extend `art.Trajectory` to store domain-specific data alongside the conversation: ```python -@wrap_rollout(model) -async def run_agent(scenario: str) -> art.Trajectory: - # Your agent logic here - agent = create_react_agent(init_chat_model(), tools) - result = await agent.ainvoke({"messages": [("user", scenario)]}) - return art.Trajectory() # Automatically populated from logs +class MyTrajectory(art.Trajectory): + final_answer: str | None = None + tools_used: list[str] = [] + custom_metrics: dict = {} ``` -The wrapper: +This allows you to: -- Creates unique execution threads for each agent run -- Logs all intermediate steps and tool calls -- Converts LangGraph messages to ART's training format -- Handles complex multi-turn conversations automatically +- Store the agent's final outputs +- Track custom metrics during execution +- Score trajectories based on your specific criteria -## Advanced Example: Email Search Agent +### 3. Rollout Functions -Here's a more complete example of training an email search agent: +Instead of decorators, define rollout functions that create and execute your LangGraph agent: ```python -import art -from art.langgraph import wrap_rollout, init_chat_model -from art.local import LocalBackend -from langgraph import create_react_agent -from typing import List - -def search_inbox(query: str, limit: int = 5) -> str: - """Search emails with improved functionality.""" - # Simulate email search with realistic results - results = [ - f"Email {i}: Subject matching '{query}' from user@example.com" - for i in range(min(limit, 3)) - ] - return "\n".join(results) if results else "No emails found." +async def my_rollout(model: art.Model, scenario: MyScenario) -> MyTrajectory: + # Create custom trajectory + traj = MyTrajectory(reward=0.0, messages_and_choices=[]) -def read_email(email_id: str) -> str: - """Read email with error handling.""" - if not email_id.isdigit(): - return "Error: Invalid email ID format" - return f"Email {email_id}: [Email content here...]" + # Define tools with access to trajectory + @tool + def my_tool(arg: str) -> str: + traj.tools_used.append("my_tool") # Track usage + return "result" -def return_final_answer(answer: str) -> str: - """Return the final answer to the user.""" - return f"Final Answer: {answer}" + # Create and run agent + chat_model = init_chat_model(model.name) + agent = create_react_agent(chat_model, [my_tool]) -tools = [search_inbox, read_email, return_final_answer] + result = await agent.ainvoke({"messages": [...]}) -async def train_advanced_email_agent(): - with LocalBackend() as backend: - model = art.TrainableModel( - name="advanced-email-agent", - project="email-agents", - base_model="Qwen/Qwen2.5-7B-Instruct", + # Score the trajectory based on results + traj.reward = calculate_reward(result) + + return traj +``` + +### 4. Training Loop Integration + +Use your rollout functions directly with ART's training loop: + +```python +# Generate trajectory groups +trajectory_groups = await art.gather_trajectory_groups( + ( + art.TrajectoryGroup( + my_rollout(model, scenario) + for _ in range(trajectories_per_scenario) ) + for scenario in scenarios + ), + pbar_desc="gather" +) - await backend.register_model(model) +# Train the model +await model.train(trajectory_groups) +``` - @wrap_rollout(model) - async def run_email_agent(scenario: str) -> art.Trajectory: - agent = create_react_agent(init_chat_model(), tools) +## Advanced Example: Email Search Agent with Custom Judging - result = await agent.ainvoke({ - "messages": [("user", scenario)] - }) +Here's a complete example that shows custom trajectory scoring, similar to production usage: - return art.Trajectory() +```python +import uuid +from pydantic import BaseModel, Field +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_core.tools import tool +from langgraph.prebuilt import create_react_agent +from litellm import acompletion +from tenacity import retry, stop_after_attempt +from art.langgraph import init_chat_model +import art - # Diverse training scenarios - scenarios = [ - "Find the most recent email from the finance team about Q4 budget", - "Search for emails containing 'meeting' and summarize the key points", - "Look for urgent emails from management and provide a brief overview", - "Find emails about project deadlines and list them by priority", +# Custom response format for judging +class CorrectnessJudge(BaseModel): + reasoning: str = Field(description="Explanation of the reasoning process.") + score: float = Field(description="Score from 0.0 to 1.0 for answer quality.") + +# Enhanced trajectory with custom fields +class EmailTrajectory(art.Trajectory): + final_answer: str | None = None + emails_searched: list[str] = [] + emails_read: list[str] = [] + +# Scenario with ground truth for comparison +class EmailScenario(BaseModel): + step: int + query: str + user_email: str + expected_answer: str # Ground truth for judging + +# Custom judge function +@retry(stop=stop_after_attempt(3)) +async def judge_email_answer(scenario: EmailScenario, answer: str) -> CorrectnessJudge: + """Judge the quality of an email search answer.""" + system_prompt = """ + You are evaluating an AI assistant's answer to an email search query. + Consider accuracy, completeness, and helpfulness. + Score from 0.0 (completely wrong) to 1.0 (perfect answer). + """ + + messages = [ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": f"Query: {scenario.query}\nExpected: {scenario.expected_answer}\nAI Answer: {answer}" + }, + ] + + response = await acompletion( + model="openai/gpt-4o-mini", + messages=messages, + response_format=CorrectnessJudge, + ) + + raw_content = response.choices[0].message.content or "{}" + try: + return CorrectnessJudge.model_validate_json(raw_content) + except Exception as e: + return CorrectnessJudge(reasoning=f"Parse error: {e}", score=0.0) + +# Main rollout function +async def email_rollout(model: art.Model, scenario: EmailScenario) -> EmailTrajectory: + traj = EmailTrajectory( + reward=0.0, + messages_and_choices=[], + metadata={ + "step": scenario.step, + "query": scenario.query, + }, + ) + + # Tools with trajectory tracking + @tool + def search_emails(keywords: list[str]) -> list[dict]: + """Search emails by keywords.""" + # Simulate real email search + results = [ + {"id": f"email_{i}", "subject": f"Email about {kw}", "from": "colleague@company.com"} + for i, kw in enumerate(keywords[:3]) ] + traj.emails_searched.extend([r["id"] for r in results]) + return results + + @tool + def read_email(email_id: str) -> dict: + """Read a specific email.""" + traj.emails_read.append(email_id) + return { + "id": email_id, + "content": f"Content of {email_id}: Important project information...", + "timestamp": "2024-01-15T10:30:00Z" + } + + @tool + def provide_final_answer(answer: str) -> str: + """Provide the final answer to the user's query.""" + traj.final_answer = answer + return f"Final answer: {answer}" + + # Create and run agent + tools = [search_emails, read_email, provide_final_answer] + chat_model = init_chat_model(model.name, temperature=0.7) + agent = create_react_agent(chat_model, tools) + + try: + config = { + "configurable": {"thread_id": str(uuid.uuid4())}, + "recursion_limit": 20, + } + + await agent.ainvoke( + { + "messages": [ + SystemMessage(content=f"You are an email assistant for {scenario.user_email}. Search and read emails to answer the query."), + HumanMessage(content=scenario.query), + ] + }, + config=config, + ) + + # Score the trajectory if we got a final answer + if traj.final_answer: + judge_result = await judge_email_answer(scenario, traj.final_answer) + traj.reward = judge_result.score + traj.metrics["judge_reasoning"] = judge_result.reasoning + else: + traj.reward = 0.0 # No answer provided + + except Exception as e: + print(f"Error in email agent: {e}") + traj.reward = 0.0 - # Generate training trajectories - for scenario in scenarios: - trajectory = await run_email_agent(scenario) - print(f"Generated trajectory for: {scenario}") + return traj - # Train with RULER - await art.train(model, reward_function="ruler") +async def train_advanced_email_agent(): + from art.local import LocalBackend + + backend = LocalBackend() + + model = art.TrainableModel( + name="advanced-email-agent", + project="email-agents", + base_model="Qwen/Qwen2.5-7B-Instruct", + ) + + await model.register(backend) + + # Training scenarios with expected answers + scenarios = [ + EmailScenario( + step=0, + query="Find emails from the finance team about Q4 budget", + user_email="manager@company.com", + expected_answer="Q4 budget is $2.5M with 15% increase from Q3..." + ), + EmailScenario( + step=0, + query="Search for urgent emails from CEO this week", + user_email="manager@company.com", + expected_answer="CEO sent urgent email about board meeting on Friday..." + ), + ] + + # Generate trajectory groups + trajectory_groups = await art.gather_trajectory_groups( + ( + art.TrajectoryGroup( + email_rollout(model, scenario) + for _ in range(4) # Multiple attempts per scenario + ) + for scenario in scenarios + ), + pbar_desc="email agent training" + ) + + # Train with custom learning rate + await model.train( + trajectory_groups, + config=art.TrainConfig(learning_rate=5e-6) + ) if __name__ == "__main__": import asyncio asyncio.run(train_advanced_email_agent()) ``` +## Troubleshooting + +### Common Issues + +**Empty trajectories or no training data captured:** + +- Ensure you're using `init_chat_model(model.name)` in your rollout function +- Verify your rollout function actually executes the agent and makes LLM calls +- Check that `init_chat_model()` is called before creating your LangGraph agent + +**Import errors:** + +- Install ART with the correct extras: `uv pip install -U openpipe-art[backend,langgraph]>=0.4.9` +- Ensure you have the required LangGraph dependencies + +**Training not starting:** + +- Verify you have trajectory data with `await art.gather_trajectory_groups(...)` +- Check that the model is properly registered with `await model.register(backend)` + ## Best Practices ### Agent Design