-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Overview
The RAG (Retrieval-Augmented Generation) Agent pattern enables agents to dynamically control retrieval strategies, going beyond basic RAG pipelines. Instead of a fixed "retrieve → generate" flow, an agentic RAG system uses an LLM as a reasoning engine that decides what to retrieve, when to retrieve, and how to refine queries. This results in adaptive retrieval with planning, reflection, and iterative refinement.
How It Works
Agentic RAG vs Basic RAG
Basic RAG (Pipeline):
User Query → Embed → Vector Search → Retrieve Top-K → Generate Answer
Agentic RAG (Loop):
User Query
↓
Agent decides retrieval strategy
↓
├─→ Retrieve from vector DB
├─→ Retrieve from BM25 (keyword search)
├─→ Rerank results
├─→ Grade relevance
├─→ Refine query if needed
└─→ Retrieve again with refined query
↓
Generate grounded answer
↓
Hallucination check
↓
Return answer OR retrieve more evidence
Key Differences
| Feature | Basic RAG | Agentic RAG |
|---|---|---|
| Retrieval | Fixed strategy | Agent decides |
| Query | Single pass | Iterative refinement |
| Relevance | Top-K only | Grading + reranking |
| Errors | No recovery | Self-correction loop |
Flow Example
User: "What were the key findings from our Q4 earnings call?"
AGENT (Router):
→ Classify: needs_retrieval = True
AGENT (Retrieval):
→ Query: "Q4 earnings call findings"
→ Hybrid search: Vector (semantic) + BM25 (keyword)
→ Retrieved: 10 documents
AGENT (Grader):
→ Grade each document for relevance
→ Relevant: 6/10 documents
→ Off-topic: 4/10 documents
→ Keep only relevant docs
AGENT (Reranker):
→ Rerank 6 relevant docs using cross-encoder
→ Top 3 most relevant
AGENT (Generator):
→ Generate answer from top 3 docs
→ "Our Q4 earnings showed 23% revenue growth..."
AGENT (Hallucination Checker):
→ Check if answer is grounded in docs
→ Grounded: True
→ Return answer
If hallucination detected:
→ AGENT (Query Refiner):
→ Refine query: "Q4 2025 earnings revenue growth statistics"
→ Retrieve again with refined query
→ Regenerate answer
Reference Implementations
- LangChain Agentic RAG Tutorial - Official LangGraph guide
- Goodbye Basic RAG — Hello Agents (2026 Playbook) - Comprehensive tutorial
- Elysia: End-to-End Agentic RAG - Weaviate implementation
- Agentic RAG with LangChain & Elasticsearch - Elasticsearch integration
- DataCamp: Agentic RAG Tutorial - Step-by-step guide
- Building Agentic RAG with LangGraph (2026 Guide) - Architecture guide
Proposed PyWorkflow Implementation
from pyworkflow import workflow, step, agent
from pyworkflow.agents import RAGAgent, VectorStore, Reranker
# Define retrieval tools
@step()
async def vector_search(query: str, top_k: int = 10) -> list[dict]:
"""Semantic search using embeddings."""
embedding = await embedding_model.embed(query)
results = await vector_db.search(embedding, top_k=top_k)
return results
@step()
async def keyword_search(query: str, top_k: int = 10) -> list[dict]:
"""Keyword search using BM25."""
results = await bm25_index.search(query, top_k=top_k)
return results
@step()
async def rerank_documents(query: str, documents: list[dict]) -> list[dict]:
"""Rerank documents using cross-encoder."""
scores = await reranker.score(query, [doc["content"] for doc in documents])
ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in ranked]
# Create RAG agent
@agent(
pattern="rag",
model="claude-sonnet-3-7",
retrieval_tools=[vector_search, keyword_search],
reranker=rerank_documents,
enable_hybrid_search=True, # Combine vector + BM25
enable_relevance_grading=True, # Grade retrieved docs
enable_query_refinement=True, # Refine query if needed
enable_hallucination_check=True, # Verify answer is grounded
max_retrieval_iterations=3, # Prevent infinite loops
)
async def research_agent(query: str):
"""
Agentic RAG research assistant.
The agent:
1. Routes query (retrieval needed?)
2. Hybrid search (vector + keyword)
3. Grades relevance of retrieved docs
4. Reranks top docs
5. Generates answer
6. Checks for hallucinations
7. Refines query and retrieves again if needed
"""
pass
# Use the agent
result = await research_agent.run(
"What were the key product announcements in our last all-hands meeting?"
)
print(result.answer)
print(result.retrieved_documents) # Documents used for answer
print(result.retrieval_iterations) # How many retrieval rounds
print(result.grounding_score) # How well answer is groundedAdvanced: Custom RAG Workflow
from pyworkflow import workflow, step
from pyworkflow.agents.rag import (
RouterNode,
RetrieverNode,
GraderNode,
GeneratorNode,
HallucinationCheckerNode,
QueryRefinerNode
)
@workflow(durable=True)
async def custom_rag_workflow(query: str):
"""
Custom agentic RAG with explicit control flow.
"""
ctx = get_context()
# 1. Route: Does this query need retrieval?
needs_retrieval = await RouterNode().decide(query)
if not needs_retrieval:
# Direct answer without retrieval
return await GeneratorNode().generate_direct(query)
# 2. Hybrid Retrieval
vector_docs = await vector_search(query, top_k=10)
keyword_docs = await keyword_search(query, top_k=10)
all_docs = vector_docs + keyword_docs
# 3. Grade Relevance
graded_docs = await GraderNode().grade(query, all_docs)
relevant_docs = [doc for doc in graded_docs if doc["relevant"]]
if not relevant_docs:
# No relevant docs - refine query and retry
refined_query = await QueryRefinerNode().refine(query, all_docs)
relevant_docs = await vector_search(refined_query, top_k=5)
# 4. Rerank
top_docs = await rerank_documents(query, relevant_docs[:10])
# 5. Generate Answer
answer = await GeneratorNode().generate(query, top_docs[:3])
# 6. Hallucination Check
is_grounded = await HallucinationCheckerNode().check(answer, top_docs[:3])
if not is_grounded:
# Hallucination detected - retrieve more evidence
refined_query = await QueryRefinerNode().refine(query, answer)
additional_docs = await vector_search(refined_query, top_k=5)
answer = await GeneratorNode().generate(query, additional_docs)
return {
"answer": answer,
"documents": top_docs[:3],
"grounded": is_grounded
}Event Types
RAG agents record these events:
-
AGENT_STARTED - RAG agent execution begins
{ "run_id": "abc123", "query": "...", "retrieval_enabled": true, "hybrid_search": true } -
AGENT_ROUTING_DECISION - Decide if retrieval needed
{"needs_retrieval": true, "reason": "Question requires external knowledge"} -
AGENT_RETRIEVAL_STARTED - Begin retrieval
{ "query": "Q4 earnings findings", "iteration": 1, "strategies": ["vector", "bm25"] } -
AGENT_RETRIEVAL_COMPLETED - Retrieval finishes
{ "iteration": 1, "vector_results": 10, "bm25_results": 10, "total_docs": 15, # After deduplication "duration_ms": 250 } -
AGENT_GRADING_COMPLETED - Relevance grading done
{ "total_docs": 15, "relevant_docs": 8, "irrelevant_docs": 7, "avg_relevance_score": 0.72 } -
AGENT_RERANKING_COMPLETED - Reranking done
{ "input_docs": 8, "reranked_docs": 8, "top_doc_score": 0.95 } -
AGENT_GENERATION_STARTED - Answer generation begins
{"num_source_docs": 3, "total_tokens": 2500} -
AGENT_GENERATION_COMPLETED - Answer generated
{ "answer": "...", "citations": [0, 1, 2], "tokens_generated": 300 } -
AGENT_HALLUCINATION_CHECK - Grounding check
{ "is_grounded": true, "confidence": 0.89, "hallucinated_claims": [] } -
AGENT_QUERY_REFINEMENT - Query refined for re-retrieval
{ "original_query": "Q4 earnings findings", "refined_query": "Q4 2025 revenue growth statistics earnings call", "reason": "Insufficient relevant documents" } -
AGENT_COMPLETED / AGENT_FAILED
Implementation Details
Router Node
class RouterNode:
async def decide(self, query: str) -> bool:
"""Decide if query needs retrieval or can be answered directly."""
prompt = f"""
Classify this query:
- "retrieval": Needs external knowledge/documents
- "direct": Can be answered from general knowledge
Query: {query}
Respond with ONLY "retrieval" or "direct".
"""
response = await self.llm.generate(messages=[{"role": "user", "content": prompt}])
needs_retrieval = response.text.strip() == "retrieval"
await ctx.record_event(EventType.AGENT_ROUTING_DECISION, {
"needs_retrieval": needs_retrieval,
"query": query
})
return needs_retrievalGrader Node
class GraderNode:
async def grade(self, query: str, documents: list[dict]) -> list[dict]:
"""Grade each document for relevance to the query."""
graded = []
for doc in documents:
prompt = f"""
Query: {query}
Document: {doc['content'][:500]}...
Is this document relevant to the query?
Respond with ONLY "yes" or "no".
"""
response = await self.llm.generate(messages=[{"role": "user", "content": prompt}])
relevant = response.text.strip().lower() == "yes"
graded.append({**doc, "relevant": relevant})
relevant_count = sum(1 for d in graded if d["relevant"])
await ctx.record_event(EventType.AGENT_GRADING_COMPLETED, {
"total_docs": len(documents),
"relevant_docs": relevant_count,
"irrelevant_docs": len(documents) - relevant_count
})
return gradedHallucination Checker Node
class HallucinationCheckerNode:
async def check(self, answer: str, documents: list[dict]) -> bool:
"""Check if answer is grounded in source documents."""
doc_content = "\n\n".join([doc["content"] for doc in documents])
prompt = f"""
Answer: {answer}
Source Documents:
{doc_content}
Is the answer fully supported by the source documents?
Respond with ONLY "yes" or "no".
"""
response = await self.llm.generate(messages=[{"role": "user", "content": prompt}])
is_grounded = response.text.strip().lower() == "yes"
await ctx.record_event(EventType.AGENT_HALLUCINATION_CHECK, {
"is_grounded": is_grounded,
"answer_length": len(answer)
})
return is_groundedQuery Refiner Node
class QueryRefinerNode:
async def refine(self, original_query: str, context: Any) -> str:
"""Refine query based on retrieval failures or answer quality."""
prompt = f"""
Original query: {original_query}
The initial retrieval did not find sufficient relevant documents.
Rewrite the query to be more specific and use different keywords.
Respond with ONLY the refined query.
"""
response = await self.llm.generate(messages=[{"role": "user", "content": prompt}])
refined_query = response.text.strip()
await ctx.record_event(EventType.AGENT_QUERY_REFINEMENT, {
"original_query": original_query,
"refined_query": refined_query
})
return refined_queryHybrid Search Implementation
async def hybrid_search(query: str, top_k: int = 10) -> list[dict]:
"""Combine vector and keyword search results."""
# Execute both searches in parallel
vector_results, bm25_results = await asyncio.gather(
vector_search(query, top_k=top_k),
keyword_search(query, top_k=top_k)
)
# Combine and deduplicate
seen_ids = set()
combined = []
for doc in vector_results + bm25_results:
if doc["id"] not in seen_ids:
seen_ids.add(doc["id"])
combined.append(doc)
return combinedTrade-offs
Pros
- Adaptive retrieval: Agent decides best strategy per query
- Higher accuracy: Grading and reranking improve relevance
- Self-correcting: Detects hallucinations and retrieves more evidence
- Query refinement: Iteratively improves retrieval quality
- Transparency: Clear trace of retrieval decisions
- Better for complex queries: Multi-step reasoning about what to retrieve
Cons
- Higher latency: Multiple LLM calls (grading, hallucination check, etc.)
- Higher cost: More LLM tokens than basic RAG
- Complexity: Many components (router, grader, reranker, checker)
- Over-engineering risk: Simple queries don't need agentic RAG
Comparison to Basic RAG
| Aspect | Agentic RAG | Basic RAG |
|---|---|---|
| Retrieval Strategy | Agent decides | Fixed |
| Accuracy | Higher (grading + reranking) | Lower |
| Latency | Higher (multiple LLM calls) | Lower (single pass) |
| Cost | Higher | Lower |
| Self-Correction | Yes (hallucination check) | No |
| Query Refinement | Yes (iterative) | No |
When to Use Agentic RAG
Use Agentic RAG when:
- Accuracy is critical (customer-facing, high-stakes)
- Queries are complex and multi-faceted
- You have a large knowledge base (grading is essential)
- Hallucinations are unacceptable
- You can afford higher latency/cost
Use Basic RAG when:
- Simple queries with clear intent
- Speed is critical
- Cost-sensitive application
- Knowledge base is small and high-quality
Related Issues
- Tool-Calling Agent Pattern: Direct Function Calling #156 - Tool-Calling Agent (RAG uses tools for retrieval)
- Plan-and-Execute Agent Pattern: Two-Phase Planning #160 - Plan-and-Execute Agent (RAG can be a specialist)
- Router/Dispatcher Agent Pattern: Intent Classification & Routing #165 - Router/Dispatcher Agent (can route to RAG agent)
References
- LangChain Agentic RAG Documentation
- Goodbye Basic RAG — Hello Agents: The 2026 Playbook
- Weaviate: Building Elysia - Agentic RAG
- Agentic RAG with Elasticsearch and LangChain
- DataCamp: Agentic RAG Tutorial
- Building Agentic RAG with LangGraph (2026)
- Agentic RAG Enterprise Guide (2026)
Implementation Checklist
- Create
pyworkflow/agents/rag.pywith RAGAgent class - Implement RouterNode (decide if retrieval needed)
- Implement GraderNode (relevance grading)
- Implement HallucinationCheckerNode (grounding check)
- Implement QueryRefinerNode (query refinement)
- Add hybrid search support (vector + BM25)
- Integrate reranking (cross-encoder)
- Add event types: AGENT_RETRIEVAL_, AGENT_GRADING_, AGENT_HALLUCINATION_CHECK
- Create @agent(pattern="rag") decorator
- Add max_retrieval_iterations safeguard
- Add tests with mock vector DB
- Document RAG agent in examples/
- Add integration tests with Weaviate/Chroma
- Support cited answers (document references)