Complete reference for all public classes and methods exported by graphrag_sdk.
- GraphRAG (Facade)
- Connection
- Providers
- Data Models
- Schema
- Ingestion Strategies
- Ingestion Pipeline
- Retrieval Strategies
- Reranking Strategies
- Storage
- Context
- Exceptions
The main entry point. Three primary operations: ingest(), retrieve(), and completion().
from graphrag_sdk import GraphRAGGraphRAG(
connection: FalkorDBConnection | ConnectionConfig,
llm: LLMInterface,
embedder: Embedder,
schema: GraphSchema | None = None,
retrieval_strategy: RetrievalStrategy | None = None,
)| Parameter | Type | Default | Description |
|---|---|---|---|
connection |
FalkorDBConnection | ConnectionConfig |
required | Database connection or config to create one |
llm |
LLMInterface |
required | LLM provider |
embedder |
Embedder |
required | Embedding provider |
schema |
GraphSchema | None |
None |
Schema constraints for extraction (empty = unconstrained) |
retrieval_strategy |
RetrievalStrategy | None |
None |
Default retrieval strategy (uses MultiPathRetrieval if None) |
Public attributes: llm, embedder, schema, graph_store, vector_store
async def ingest(
source: str | list[str],
*,
text: str | None = None,
loader: LoaderStrategy | None = None,
chunker: ChunkingStrategy | None = None,
extractor: ExtractionStrategy | None = None,
resolver: ResolutionStrategy | None = None,
max_concurrent: int = 3,
ctx: Context | None = None,
) -> IngestionResult | list[IngestionResult]Build a knowledge graph from one or more sources. Auto-detects loader from file extension. When a list of sources is provided, documents are ingested in parallel with bounded concurrency.
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str | list[str] |
required | File path (or list of paths) for ingestion |
text |
str | None |
None |
Raw text (single source only; skips loader) |
loader |
LoaderStrategy | None |
None |
Custom loader (auto-detect if None) |
chunker |
ChunkingStrategy | None |
None |
Custom chunker (FixedSizeChunking(1000) if None) |
extractor |
ExtractionStrategy | None |
None |
Custom extractor (GraphExtraction if None) |
resolver |
ResolutionStrategy | None |
None |
Custom resolver (ExactMatchResolution if None) |
max_concurrent |
int |
3 |
Max parallel ingestions (list input only) |
ctx |
Context | None |
None |
Execution context |
Returns: IngestionResult for a single source, list[IngestionResult] for multiple sources.
async def retrieve(
question: str,
*,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
ctx: Context | None = None,
) -> RetrieverResultRetrieve context from the knowledge graph without generating an answer. Use this to inspect retrieved context or pass it to your own LLM.
| Parameter | Type | Default | Description |
|---|---|---|---|
question |
str |
required | The user's question |
strategy |
RetrievalStrategy | None |
None |
Override retrieval strategy |
reranker |
RerankingStrategy | None |
None |
Optional reranking after retrieval |
ctx |
Context | None |
None |
Execution context |
Returns: RetrieverResult
async def completion(
question: str,
*,
history: list[ChatMessage | dict[str, str]] | None = None,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
prompt_template: str | None = None,
return_context: bool = False,
ctx: Context | None = None,
) -> RagResultFull RAG pipeline: retrieve context and generate an answer. When history is provided, messages are passed natively to the LLM provider's multi-turn chat API.
| Parameter | Type | Default | Description |
|---|---|---|---|
question |
str |
required | The user's question |
history |
list[ChatMessage | dict] | None |
None |
Conversation history (see below) |
strategy |
RetrievalStrategy | None |
None |
Override retrieval strategy |
reranker |
RerankingStrategy | None |
None |
Optional reranking after retrieval |
prompt_template |
str | None |
None |
Custom prompt for single-turn (must contain {context} and {question}). Ignored when history is provided. |
return_context |
bool |
False |
Include retriever results in output |
ctx |
Context | None |
None |
Execution context |
Returns: RagResult
Conversation history:
History accepts a list of ChatMessage objects or plain dicts with role and content keys. Supported roles: "system", "user", "assistant". Invalid roles raise ValueError.
from graphrag_sdk import ChatMessage
# Using ChatMessage objects
result = await rag.completion(
"What happened next?",
history=[
ChatMessage(role="user", content="Who is Alice?"),
ChatMessage(role="assistant", content="Alice is an engineer."),
],
)
# Using plain dicts
result = await rag.completion(
"Tell me more.",
history=[
{"role": "user", "content": "What is Acme?"},
{"role": "assistant", "content": "A tech company."},
],
)When history is provided, completion() builds a native messages list: [system_prompt, *history, user_question] and calls LLMInterface.ainvoke_messages(). Without history, it uses the single-turn ainvoke() path.
async def query(question: str, **kwargs) -> RagResultDeprecated. Use completion() for the full RAG pipeline or retrieve() for retrieval-only. Emits a DeprecationWarning and delegates to completion().
async def deduplicate_entities(
*,
fuzzy: bool = False,
similarity_threshold: float = 0.9,
batch_size: int = 500,
) -> intPost-ingestion entity deduplication. Groups entities by (normalized name, label) to prevent cross-type merging (e.g. Person "Paris" and Location "Paris" stay separate).
- Phase 1 (always): Exact name match -- keeps longest description, remaps RELATES and MENTIONED_IN edges, deletes duplicates.
- Phase 2 (optional,
fuzzy=True): Embedding-based -- embeds entity names, finds near-duplicates by cosine similarity.
Call once after all documents are ingested.
Returns: Number of duplicate entities merged.
async def finalize() -> dict[str, Any]Run all post-ingestion steps after all documents are ingested. Bundles:
deduplicate_entities()-- global exact-name dedupbackfill_entity_embeddings()-- name-only embeddingsembed_relationships()-- fact text embeddings on RELATES edgesensure_indices()-- all indexes
Returns: Dict with counts: entities_deduplicated, entities_embedded, relationships_embedded, indexes.
def retrieve_sync(question: str, **kwargs) -> RetrieverResult
def completion_sync(question: str, **kwargs) -> RagResult
def ingest_sync(source: str | list[str], **kwargs) -> IngestionResult | list[IngestionResult]
def finalize_sync() -> dict[str, Any]
def query_sync(question: str, **kwargs) -> RagResult # deprecatedConvenience methods that run the async versions in asyncio.run().
from graphrag_sdk import ConnectionConfig, FalkorDBConnection@dataclass
class ConnectionConfig:
host: str = "localhost"
port: int = 6379
username: str | None = None
password: str | None = None
graph_name: str = "knowledge_graph"
max_connections: int = 16
retry_count: int = 3
retry_delay: float = 1.0
query_timeout_ms: int = 10_000
pool_timeout: int = 30FalkorDBConnection(config: ConnectionConfig | None = None)| Method | Description |
|---|---|
await conn.query(cypher, params=None, timeout=None) |
Execute a Cypher query |
await conn.close() |
Close the connection pool |
conn.graph |
Lazy property returning the AsyncGraph handle |
from graphrag_sdk import LLMInterface, Embedder, LLMBatchItem
from graphrag_sdk import LiteLLM, LiteLLMEmbedder, OpenRouterLLM, OpenRouterEmbedderLLMInterface(model_name: str, model_params: dict | None = None, max_concurrency: int = 12)| Method | Signature | Description |
|---|---|---|
invoke |
(prompt: str, **kwargs) -> LLMResponse |
Sync text generation (abstract) |
ainvoke |
(prompt: str, *, max_retries=3, **kwargs) -> LLMResponse |
Async with retry + backoff |
ainvoke_messages |
(messages: list[ChatMessage], *, max_retries=3, **kwargs) -> LLMResponse |
Multi-turn native messages (see below) |
invoke_with_model |
(prompt: str, response_model: Type[BaseModel], **kwargs) -> BaseModel |
Structured output |
ainvoke_with_model |
(prompt: str, response_model: Type[BaseModel], *, max_retries=3) -> BaseModel |
Async structured output |
abatch_invoke |
(prompts: list[str], *, max_concurrency=None, max_retries=3) -> list[LLMBatchItem] |
Concurrent batch |
ainvoke_messages() is used by completion() when conversation history is provided. The default implementation concatenates messages into a single prompt string and calls ainvoke(), so custom providers work without changes. LiteLLM and OpenRouterLLM override this with native multi-turn implementations.
| Method | Signature | Description |
|---|---|---|
model_name |
@property -> str |
Embedding model identifier (abstract) |
embed_query |
(text: str, **kwargs) -> list[float] |
Single text embedding (abstract) |
aembed_query |
(text: str, **kwargs) -> list[float] |
Async single (default: thread pool) |
embed_documents |
(texts: list[str], **kwargs) -> list[list[float]] |
Batch (default: sequential) |
aembed_documents |
(texts: list[str], **kwargs) -> list[list[float]] |
Async batch (default: thread pool) |
@dataclass
class LLMBatchItem:
index: int
response: LLMResponse | None = None
error: Exception | None = None
@property
def ok(self) -> bool # True if response is not NoneLiteLLM(model: str, *, api_key=None, api_base=None, api_version=None, temperature=0.0, max_tokens=None, **kwargs)LiteLLMEmbedder(model: str, *, api_key=None, api_base=None, api_version=None, **kwargs)OpenRouterLLM(model: str, *, api_key=None, temperature=0.0, max_tokens=None, extra_headers=None)OpenRouterEmbedder(model: str, *, api_key=None, extra_headers=None)All models extend DataModel (Pydantic BaseModel with extra="allow").
from graphrag_sdk import GraphNode, GraphRelationship, GraphData
from graphrag_sdk import TextChunk, TextChunks
from graphrag_sdk import DocumentInfo, DocumentOutput
from graphrag_sdk import IngestionResult, RagResult
from graphrag_sdk import RetrieverResult, RetrieverResultItem
from graphrag_sdk import ResolutionResult, SearchTypeclass GraphNode(DataModel):
id: str # Unique identifier
label: str # Node label (Person, Place, etc.)
properties: dict[str, Any] = {} # Key-value properties
embedding_properties: dict[str, list[float]] | None = Noneclass GraphRelationship(DataModel):
start_node_id: str
end_node_id: str
type: str # Edge type (RELATES for all extracted rels)
properties: dict[str, Any] = {} # Includes rel_type, fact, keywords, etc.
embedding_properties: dict[str, list[float]] | None = Noneclass GraphData(DataModel):
nodes: list[GraphNode] = []
relationships: list[GraphRelationship] = []class TextChunk(DataModel):
text: str
index: int
metadata: dict[str, Any] = {}
uid: str # Auto-generated UUIDclass TextChunks(DataModel):
chunks: list[TextChunk] = []class DocumentInfo(DataModel):
path: str | None = None
uid: str # Auto-generated UUID
metadata: dict[str, Any] = {}class DocumentOutput(DataModel):
text: str
document_info: DocumentInfo = DocumentInfo()class IngestionResult(DataModel):
document_info: DocumentInfo = DocumentInfo()
nodes_created: int = 0
relationships_created: int = 0
chunks_indexed: int = 0
metadata: dict[str, Any] = {}class RagResult(DataModel):
answer: str
retriever_result: RetrieverResult | None = None # Populated when return_context=True
metadata: dict[str, Any] = {} # Contains model, num_context_items, strategyclass RetrieverResult(DataModel):
items: list[RetrieverResultItem] = []
metadata: dict[str, Any] = {}class RetrieverResultItem(DataModel):
content: str
metadata: dict[str, Any] = {}
score: float | None = Noneclass ResolutionResult(DataModel):
nodes: list[GraphNode] = []
relationships: list[GraphRelationship] = []
merged_count: int = 0from graphrag_sdk import ChatMessage
class ChatMessage(DataModel):
role: Literal["system", "user", "assistant"]
content: str
def to_dict(self) -> dict[str, str] # {"role": ..., "content": ...}Validated message type for multi-turn conversations. Used by completion(history=...) and LLMInterface.ainvoke_messages(). Invalid roles raise a validation error on construction.
LLMMessage is a backward-compatible alias for ChatMessage.
class LLMResponse(DataModel):
content: str
tool_calls: list[dict[str, Any]] | None = Noneclass SearchType(str, Enum):
VECTOR = "vector"
FULLTEXT = "fulltext"
HYBRID = "hybrid"class ExtractedEntity(DataModel):
name: str
type: str
description: str = ""
source_chunk_ids: list[str] = []
class ExtractedRelation(DataModel):
source: str
target: str
type: str
keywords: str = ""
description: str = ""
weight: float = 1.0
source_chunk_ids: list[str] = []
class EntityMention(DataModel):
chunk_id: str
entity_id: str
class ExtractionOutput(DataModel):
entities: list[ExtractedEntity] = []
relations: list[ExtractedRelation] = []
mentions: list[EntityMention] = []def compute_entity_id(name: str, entity_type: str = "") -> strDeterministic entity ID from normalized name and optional type. When entity_type is provided, appends a __type suffix to prevent cross-type collisions (e.g. paris__person vs paris__location). Without entity_type, returns just the normalized name for backwards compatibility.
from graphrag_sdk import GraphSchema, EntityType, RelationTypeclass EntityType(DataModel):
label: str # e.g. "Person"
description: str | None = None # Helps LLM understand what to extract
properties: list[PropertyType] = [] # Optional property definitionsclass RelationType(DataModel):
label: str # e.g. "WORKS_AT"
description: str | None = None
patterns: list[tuple[str, str]] = [] # Allowed (source_label, target_label) pairsclass PropertyType(DataModel):
name: str
type: str = "STRING" # STRING, INTEGER, FLOAT, BOOLEAN, DATE, LIST
description: str | None = None
required: bool = Falseclass GraphSchema(DataModel):
entities: list[EntityType] = []
relations: list[RelationType] = []class LoaderStrategy(ABC):
@abstractmethod
async def load(self, source: str, ctx: Context) -> DocumentOutput: ...Built-in: TextLoader(encoding="utf-8"), PdfLoader()
class ChunkingStrategy(ABC):
@abstractmethod
async def chunk(self, text: str, ctx: Context) -> TextChunks: ...Built-in: FixedSizeChunking(chunk_size=1000, chunk_overlap=100)
class ExtractionStrategy(ABC):
@abstractmethod
async def extract(self, chunks: TextChunks, schema: GraphSchema, ctx: Context) -> GraphData: ...Built-in:
GraphExtraction(llm, *, entity_extractor=None, coref_resolver=None, entity_types=None, max_concurrency=None)
Entity Extractors (step 1 backends for GraphExtraction):
GLiNERExtractor(threshold=0.75, model_name="urchade/gliner_medium-v2.1")-- default, local NERLLMExtractor(llm, threshold=0.75)-- LLM-based NER- Subclass
EntityExtractorfor custom backends
class ResolutionStrategy(ABC):
@abstractmethod
async def resolve(self, graph_data: GraphData, ctx: Context) -> ResolutionResult: ...Built-in:
ExactMatchResolution(resolve_property="id")DescriptionMergeResolution(llm=None, force_summary_threshold=3, max_summary_tokens=500)
from graphrag_sdk import IngestionPipelineIngestionPipeline(
loader: LoaderStrategy,
chunker: ChunkingStrategy,
extractor: ExtractionStrategy,
resolver: ResolutionStrategy,
graph_store: GraphStore,
vector_store: VectorStore,
schema: GraphSchema | None = None,
embedder: Embedder | None = None,
)| Method | Signature | Description |
|---|---|---|
run |
(source, ctx=None, *, text=None, document_info=None) -> IngestionResult |
Execute the full 9-step pipeline |
Uses the Template Method pattern.
class RetrievalStrategy(ABC):
def __init__(self, graph_store=None, vector_store=None): ...
async def search(self, query, ctx=None, **kwargs) -> RetrieverResult: ...
@abstractmethod
async def _execute(self, query, ctx, **kwargs) -> RawSearchResult: ...LocalRetrieval(graph_store, vector_store, embedder, top_k=5, include_entities=True)MultiPathRetrieval(
graph_store, vector_store, embedder, llm,
*,
chunk_top_k=15,
max_entities=30,
max_relationships=20,
rel_top_k=15,
keyword_limit=10,
)class RerankingStrategy(ABC):
@abstractmethod
async def rerank(self, query, result: RetrieverResult, ctx: Context) -> RetrieverResult: ...CosineReranker(embedder: Embedder, top_k: int = 15)from graphrag_sdk import GraphStore, VectorStoreGraphStore(connection: FalkorDBConnection)| Method | Signature | Description |
|---|---|---|
upsert_nodes |
(nodes: list[GraphNode]) -> int |
Batched MERGE, returns count |
upsert_relationships |
(rels: list[GraphRelationship]) -> int |
Batched MERGE with label hints |
get_connected_entities |
(chunk_id, max_hops=1) -> list[dict] |
N-hop entity traversal |
query_raw |
(cypher, params=None) -> Any |
Raw Cypher execution |
get_statistics |
() -> dict |
Node/edge counts, types, density |
delete_all |
() -> None |
Delete all data |
VectorStore(connection, embedder=None, index_name="chunk_embeddings", embedding_dimension=256, similarity_function="cosine")| Method | Signature | Description |
|---|---|---|
create_vector_index |
(label="Chunk", property="embedding") -> None |
Create vector index |
create_entity_vector_index |
() -> None |
Create entity vector index |
create_fulltext_index |
(label="Chunk", *properties) -> None |
Create fulltext index |
ensure_indices |
() -> None |
Create all standard indices |
index_chunks |
(chunks: TextChunks) -> int |
Embed and store chunk vectors |
backfill_entity_embeddings |
() -> int |
Embed all entities missing vectors |
embed_relationships |
() -> int |
Embed fact text on RELATES edges |
search |
(query_vector, top_k=5, label="Chunk") -> list[dict] |
Vector similarity search |
search_entities |
(query_vector, top_k=5) -> list[dict] |
Entity vector search |
search_relationships |
(query_vector, top_k=15) -> list[dict] |
RELATES edge vector search |
fulltext_search |
(query, top_k=5, label="Chunk") -> list[dict] |
Fulltext keyword search |
from graphrag_sdk import ContextExecution context for logging and budget tracking.
Context(tenant_id: str = "default", latency_budget_ms: float = 60000.0)| Method/Property | Description |
|---|---|
ctx.log(message, log_level=logging.INFO) |
Log a message |
ctx.budget_exceeded |
True if elapsed time > latency_budget_ms |
from graphrag_sdk import GraphRAGError| Exception | When Raised |
|---|---|
GraphRAGError |
Base exception for all SDK errors |
LoaderError |
File loading failures |
ChunkingError |
Text splitting failures |
ExtractionError |
LLM extraction or JSON parsing failures |
ResolutionError |
Entity deduplication failures |
RetrieverError |
Retrieval execution failures |
DatabaseError |
FalkorDB connection or query failures |
IngestionError |
Pipeline orchestration failures |
SchemaValidationError |
Schema constraint violations |