Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c888b96
Update langfuse dependency to version 3.3.4 across multiple services …
a-klos Sep 4, 2025
de9854a
chore: merge main
a-klos Oct 7, 2025
c7676fd
chore: add esbuild as a dependency in package.json and package-lock.json
a-klos Oct 7, 2025
a1683ec
chore: update Tiltfile to simplify ignore patterns and enhance k3d cl…
a-klos Oct 7, 2025
04990a4
Merge branch 'main' into fix/bitnami-issues
a-klos Oct 7, 2025
bcb6edd
chore: remove commented-out Helm chart installation for ingress-nginx
a-klos Oct 7, 2025
35cf552
Update infrastructure/server-setup/base-setup/Chart.yaml
a-klos Oct 7, 2025
bfcebef
refactor: streamline k3d cluster creation logic and improve Helm char…
a-klos Oct 8, 2025
7187675
Merge branch 'fix/bitnami-issues' of github.com:stackitcloud/rag-temp…
a-klos Oct 8, 2025
23b4b88
fix: correct formatting of IGNORE_BASE list in Tiltfile
a-klos Oct 8, 2025
fcc14c9
feat: enhance CompositeRetriever with concurrent retrieval and early …
a-klos Oct 8, 2025
8bfc8bd
feat: update retriever settings to introduce canonical total_k_docume…
a-klos Oct 9, 2025
0984786
feat: enhance CompositeRetriever with new reranker settings and summa…
a-klos Oct 9, 2025
b736057
refactor: improve code formatting and readability across multiple files
a-klos Oct 9, 2025
083e2c7
chore: Merge branch 'main' into chore/retriever-performance-improvement
a-klos Dec 16, 2025
1749db7
chore: update RETRIEVER_TOTAL_K_DOCUMENTS to 7 in backend configuration
a-klos Dec 16, 2025
79af726
refactor: streamline return statements and improve test document crea…
a-klos Dec 16, 2025
e2104a2
refactor: enhance docstrings for mock classes and unit tests in retri…
a-klos Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions infrastructure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ backend:
retriever:
RETRIEVER_THRESHOLD: 0.3
RETRIEVER_K_DOCUMENTS: 10
RETRIEVER_TOTAL_K: 7
# Canonical global cap (previously RETRIEVER_TOTAL_K / RETRIEVER_OVERALL_K_DOCUMENTS)
RETRIEVER_TOTAL_K_DOCUMENTS: 7
RETRIEVER_SUMMARY_THRESHOLD: 0.3
RETRIEVER_SUMMARY_K_DOCUMENTS: 10
RETRIEVER_TABLE_THRESHOLD: 0.3
Expand Down Expand Up @@ -489,7 +490,7 @@ Afterwards, the services are accessible from [http://rag.localhost](http://rag.l

Note: The command above has only been tested on *Ubuntu 22.04 LTS*.

On *Windows* you can adjust the hosts file as described [here](https://docs.digitalocean.com/products/paperspace/machines/how-to/edit-windows-hosts-file/).
On *Windows* you can adjust the hosts file as described in the DigitalOcean guide on [editing the Windows hosts file](https://docs.digitalocean.com/products/paperspace/machines/how-to/edit-windows-hosts-file/).

### 2.2 Production Setup Instructions

Expand All @@ -499,7 +500,7 @@ For deployment of the *NGINX Ingress Controller* and a cert-manager, the followi

[base-setup](server-setup/base-setup/Chart.yaml)

The email [here](server-setup/base-setup/templates/cert-issuer.yaml) should be changed from `<replace@me.com>` to a real email address.
The email in the [cert-issuer template](server-setup/base-setup/templates/cert-issuer.yaml) should be changed from `<replace@me.com>` to a real email address.

## 3. Contributing

Expand Down
4 changes: 3 additions & 1 deletion infrastructure/rag/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ backend:
retriever:
RETRIEVER_THRESHOLD: 0.3
RETRIEVER_K_DOCUMENTS: 10
RETRIEVER_TOTAL_K: 7
# Canonical global cap across all retrievers. Replaces legacy RETRIEVER_TOTAL_K / RETRIEVER_OVERALL_K_DOCUMENTS
RETRIEVER_TOTAL_K_DOCUMENTS: 7
RETRIEVER_SUMMARY_THRESHOLD: 0.3
RETRIEVER_SUMMARY_K_DOCUMENTS: 10
RETRIEVER_TABLE_THRESHOLD: 0.3
Expand Down Expand Up @@ -220,6 +221,7 @@ backend:
reranker:
RERANKER_K_DOCUMENTS: 5
RERANKER_MIN_RELEVANCE_SCORE: 0.001
RERANKER_ENABLED: true
chatHistory:
CHAT_HISTORY_LIMIT: 4
CHAT_HISTORY_REVERSE: true
Expand Down
7 changes: 6 additions & 1 deletion libs/rag-core-api/src/rag_core_api/dependency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ class DependencyContainer(DeclarativeContainer):
vectorstore=vectorstore,
)

flashrank_reranker = Singleton(FlashrankRerank, top_n=reranker_settings.k_documents)
flashrank_reranker = Singleton(
FlashrankRerank, top_n=reranker_settings.k_documents, score_threshold=reranker_settings.min_relevance_score
)
reranker = Singleton(FlashrankReranker, flashrank_reranker)

information_pieces_uploader = Singleton(DefaultInformationPiecesUploader, vector_database)
Expand Down Expand Up @@ -170,6 +172,9 @@ class DependencyContainer(DeclarativeContainer):
CompositeRetriever,
List(image_retriever, table_retriever, text_retriever, summary_retriever),
reranker,
reranker_settings.enabled,
retriever_settings.total_k_documents,
reranker_settings.k_documents,
)

information_piece_mapper = Singleton(InformationPieceMapper)
Expand Down
7 changes: 7 additions & 0 deletions libs/rag-core-api/src/rag_core_api/impl/graph/chat_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ async def _retrieve_node(self, state: dict) -> dict:
if document.metadata.get("type", ContentType.SUMMARY.value) != ContentType.SUMMARY.value
]

# If only summaries were retrieved (no concrete underlying documents), treat as "no documents"
if not information_pieces:
return {
self.ERROR_MESSAGES_KEY: [self._error_messages.no_documents_message],
self.FINISH_REASONS: ["No documents found"],
}

response["information_pieces"] = information_pieces
response["langchain_documents"] = retrieved_documents

Expand Down
184 changes: 166 additions & 18 deletions libs/rag-core-api/src/rag_core_api/impl/retriever/composite_retriever.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
"""Module for the CompositeRetriever class."""
"""Module for the CompositeRetriever class.

Performance notes / improvements (2025-10):
- Retriever invocations are now executed concurrently via ``asyncio.gather`` instead of
sequential awaits inside a for-loop. This reduces end-to-end latency roughly to the
slowest individual retriever call instead of the sum of all.
- Duplicate filtering now uses an O(1) set membership check instead of rebuilding a list
comprehension for every candidate (previously O(n^2)).
- Early pruning hook (``_total_k``) is prepared for future enhancement; if provided it
allows trimming the merged candidate list before an optional reranker is invoked.
"""

import logging
import asyncio
from copy import deepcopy
from typing import Any, Optional
from typing import Any, Optional, Iterable

from langchain_core.documents import Document
from langchain_core.runnables import RunnableConfig
Expand All @@ -22,6 +33,9 @@ def __init__(
self,
retrievers: list[RetrieverQuark],
reranker: Optional[Reranker],
reranker_enabled: bool,
total_retrieved_k_documents: int | None = None,
reranker_k_documents: int | None = None,
**kwargs,
):
"""
Expand All @@ -33,12 +47,22 @@ def __init__(
A list of retriever quarks to be used by the composite retriever.
reranker : Optional[Reranker]
An optional reranker to rerank the retrieved results.
reranker_enabled : bool
A flag indicating whether the reranker is enabled.
total_retrieved_k_documents : int | None
The total number of documents to retrieve (default None, meaning no limit).
reranker_k_documents : int | None
The number of documents to retrieve for the reranker (default None, meaning no limit).
**kwargs : dict
Additional keyword arguments to be passed to the superclass initializer.
"""
super().__init__(**kwargs)
self._reranker = reranker
self._retrievers = retrievers
# Optional global cap (before reranking) on merged candidates. If None, no cap applied.
self._total_retrieved_k_documents = total_retrieved_k_documents
self._reranker_k_documents = reranker_k_documents
self._reranker_enabled = reranker_enabled

def verify_readiness(self) -> None:
"""
Expand Down Expand Up @@ -67,7 +91,7 @@ async def ainvoke(
retriever_input : str
The input string to be processed by the retrievers.
config : Optional[RunnableConfig]
Configuration for the retrievers (default None).
Configuration for the retrievers and reranker (default None).
**kwargs : Any
Additional keyword arguments.

Expand All @@ -83,24 +107,148 @@ async def ainvoke(
- Duplicate entries are removed based on their metadata ID.
- If a reranker is available, the results are further processed by the reranker.
"""
results = []
if config is None:
config = RunnableConfig(metadata={"filter_kwargs": {}})
for retriever in self._retrievers:
tmp_config = deepcopy(config)
results += await retriever.ainvoke(retriever_input, config=tmp_config)

# remove summaries
results = [x for x in results if x.metadata["type"] != ContentType.SUMMARY.value]
# Run all retrievers concurrently instead of sequentially.
tasks = [r.ainvoke(retriever_input, config=deepcopy(config)) for r in self._retrievers]
retriever_outputs = await asyncio.gather(*tasks, return_exceptions=False)
# Flatten
results: list[Document] = [doc for group in retriever_outputs for doc in group]

summary_docs: list[Document] = [d for d in results if d.metadata.get("type") == ContentType.SUMMARY.value]

results = self._use_summaries(summary_docs, results)

return_val = self._remove_duplicates(results)

return_val = self._early_pruning(return_val)

return await self._arerank_pruning(return_val, retriever_input, config)

def _use_summaries(self, summary_docs: list[Document], results: list[Document]) -> list[Document]:
"""Utilize summary documents to enhance retrieval results.

Parameters
----------
summary_docs : list[Document]
A list of summary documents to use.
results : list[Document]
A list of retrieval results to enhance.

Returns
-------
list[Document]
The enhanced list of documents.
"""
try:
# Collect existing ids for fast membership tests
existing_ids: set[str] = {d.metadata.get("id") for d in results}

# Gather related ids not yet present

missing_related_ids: set[str] = set()
for sdoc in summary_docs:
related_list: Iterable[str] = sdoc.metadata.get("related", [])
[missing_related_ids.add(rid) for rid in related_list if rid and rid not in existing_ids]

if missing_related_ids:
# Heuristic: use the first retriever's underlying vector database for lookup.
# All quarks share the same vector database instance in current design.
vector_db = None
if self._retrievers:
# Access protected member as an implementation detail – acceptable within package.
vector_db = getattr(self._retrievers[0], "_vector_database", None)
if vector_db and hasattr(vector_db, "get_documents_by_ids"):
try:
expanded_docs: list[Document] = vector_db.get_documents_by_ids(list(missing_related_ids))
# Merge while preserving original order precedence (append new ones)
results.extend(expanded_docs)
existing_ids.update(d.metadata.get("id") for d in expanded_docs)
logger.debug(
"Summary expansion added %d underlying documents (from %d summaries).",
len(expanded_docs),
len(summary_docs),
)
except Exception:
logger.exception("Failed to expand summary related documents.")
else:
logger.debug("Vector database does not expose get_documents_by_ids; skipping summary expansion.")
finally:
# Remove summaries after expansion step
results = [x for x in results if x.metadata.get("type") != ContentType.SUMMARY.value]
return results

def _remove_duplicates(self, documents: list[Document]) -> list[Document]:
"""Remove duplicate documents from a list based on their IDs.

Parameters
----------
documents : list[Document]
The list of documents to filter.

Returns
-------
list[Document]
The filtered list of documents with duplicates removed.
"""
seen_ids = set()
unique_docs = []
for doc in documents:
doc_id = doc.metadata.get("id")
if doc_id not in seen_ids:
seen_ids.add(doc_id)
unique_docs.append(doc)
return unique_docs

def _early_pruning(self, documents: list[Document]) -> list[Document]:
"""Prune documents early based on certain criteria.

# remove duplicated entries
return_val = []
for result in results:
if result.metadata["id"] in [x.metadata["id"] for x in return_val]:
continue
return_val.append(result)
Parameters
----------
documents : list[Document]
The list of documents to prune.

if self._reranker and results:
return_val = await self._reranker.ainvoke((return_val, retriever_input), config=config)
Returns
-------
list[Document]
The pruned list of documents.
"""
# Optional early global pruning (only if configured and more than total_k)
if self._total_retrieved_k_documents is not None and len(documents) > self._total_retrieved_k_documents:
# If score metadata exists, use it to prune; otherwise keep ordering as-is.
if all("score" in d.metadata for d in documents):
documents.sort(key=lambda d: d.metadata["score"], reverse=True)
return documents[: self._total_retrieved_k_documents]
return documents

async def _arerank_pruning(
self, documents: list[Document], retriever_input: dict, config: Optional[RunnableConfig] = None
) -> list[Document]:
"""Prune documents by reranker.

Parameters
----------
documents : list[Document]
The list of documents to prune.
retriever_input : dict
The input to the retriever.
config : Optional[RunnableConfig]
Configuration for the retrievers and reranker (default None).

return return_val
Returns
-------
list[Document]
The pruned list of documents.
"""
if (
self._reranker_k_documents is not None
and len(documents) > self._reranker_k_documents
and self._reranker_enabled
):
# Only invoke reranker if there are more docs than it will output OR if score missing.
try:
documents = await self._reranker.ainvoke((documents, retriever_input), config=config)
except Exception: # pragma: no cover - fail soft; return unreranked if reranker errors
logger.exception("Reranker failed; returning unreranked results.")
return documents
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class RerankerSettings(BaseSettings):
----------
k_documents : int
The number of documents to return after reranking (default 5).
min_relevance_score : float
Minimum relevance threshold to return (default 0.001).
enabled : bool
A flag indicating whether the reranker is enabled (default True).
"""

class Config:
Expand All @@ -21,3 +25,5 @@ class Config:
case_sensitive = False

k_documents: int = Field(default=5)
min_relevance_score: float = Field(default=0.001)
enabled: bool = Field(default=True)
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
"""Module that contains settings regarding the retriever."""
"""Module that contains settings regarding the retriever.

from pydantic import Field
Notes
-----
`total_k_documents` is the canonical global cap across all retrievers. It replaces the
previous environment variable names `RETRIEVER_TOTAL_K` and `RETRIEVER_OVERALL_K_DOCUMENTS`.
For backward compatibility, those legacy names are still accepted if the canonical
`RETRIEVER_TOTAL_K_DOCUMENTS` is not set.
"""

from pydantic import Field, AliasChoices
from pydantic_settings import BaseSettings


Expand All @@ -11,7 +19,7 @@ class RetrieverSettings(BaseSettings):
The threshold value for the retriever (default 0.5).
k_documents : int
The number of documents to retrieve (default 10).
total_k : int
total_k_documents : int
The total number of documents (default 10).
table_threshold : float
The threshold value for table retrieval (default 0.37).
Expand All @@ -35,10 +43,19 @@ class Config:

threshold: float = Field(default=0.5)
k_documents: int = Field(default=10)
total_k: int = Field(default=10)
table_threshold: float = Field(default=0.37)
table_k_documents: int = Field(default=10)
summary_threshold: float = Field(default=0.5)
summary_k_documents: int = Field(default=10)
image_threshold: float = Field(default=0.5)
image_k_documents: int = Field(default=10)
# Canonical global cap (previously RETRIEVER_TOTAL_K / RETRIEVER_OVERALL_K_DOCUMENTS).
# Accept legacy env var names as fallbacks via validation alias choices.
total_k_documents: int = Field(
default=10,
validation_alias=AliasChoices(
"TOTAL_K_DOCUMENTS", # canonical -> RETRIEVER_TOTAL_K_DOCUMENTS
"TOTAL_K", # legacy -> RETRIEVER_TOTAL_K
"OVERALL_K_DOCUMENTS", # legacy -> RETRIEVER_OVERALL_K_DOCUMENTS
),
)
Loading
Loading