-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor DataStore imports, knowledge paths, and vector-store build flow #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,189 +1,180 @@ | ||
| from langchain_community.document_loaders import TextLoader | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| import os | ||
| import pickle | ||
| from pathlib import Path | ||
|
|
||
| from langchain.text_splitter import RecursiveCharacterTextSplitter | ||
| from langchain_community.document_loaders import DocusaurusLoader | ||
| from langchain_openai import OpenAIEmbeddings | ||
| from langchain_community.document_loaders import DocusaurusLoader, TextLoader | ||
| from langchain_community.vectorstores.faiss import FAISS | ||
| from utils.manager_tools import ManagerTools as mt | ||
| import pickle | ||
| import os | ||
| from langchain_openai import OpenAIEmbeddings | ||
|
|
||
| from assistants.utils.manager_tools import ManagerTools as mt | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class DataStore: | ||
| """ | ||
| A class responsible for managing data storage and retrieval for the chat application. | ||
|
|
||
| Attributes: | ||
| WEB (str): Constant representing web-based data source. | ||
| LOCAL (str): Constant representing local text-based data source. | ||
| SITE_URL (str): URL of the website. | ||
| site_datas_light (str): Path to light version of site data. | ||
| site_datas (str): Path to full site data. | ||
| LOCAL_PATH (str): Default local data path. | ||
| docs_pickle_path (str): Path to pickle file for storing documents. | ||
| origin (str): Data source origin. | ||
| """ | ||
| Manage document loading and vector store creation for the chat application. | ||
| """ | ||
|
|
||
| WEB = "web" | ||
| LOCAL = "text" | ||
| PICKLE = "pickel" | ||
| PICKLE = "pickle" | ||
| # Backward-compatibility alias | ||
| PICKEL = PICKLE | ||
|
|
||
| SITE_URL = "https://kobu.agency/" | ||
|
|
||
| site_datas_light = 'assistant/knowledge/data_store_files/default/site_datas_light.txt' | ||
| site_datas = 'assistant/knowledge/data_store_files/default/site_datas.txt' | ||
| _BASE_DIR = Path(__file__).resolve().parents[1] | ||
| _KNOWLEDGE_DIR = _BASE_DIR / "data" / "knowledge" | ||
| _DATA_STORE_DIR = _KNOWLEDGE_DIR / "data_store_files" / "default" | ||
|
|
||
| site_datas_light = str(_DATA_STORE_DIR / "site_datas_light.txt") | ||
| site_datas = str(_DATA_STORE_DIR / "site_datas.txt") | ||
|
|
||
| LOCAL_PATH = site_datas | ||
| docs_pickle_path = "assistant/knowledge/data_store_files/default/docs.pickle" | ||
| docs_pickle_path = str(_DATA_STORE_DIR / "docs.pickle") | ||
| origin = LOCAL | ||
|
|
||
| @classmethod | ||
| def get_vector_store(cls, oringin_preference = PICKLE) -> FAISS: | ||
| def get_vector_store( | ||
| cls, | ||
| origin_preference: str = PICKLE, | ||
| oringin_preference: str | None = None, | ||
| ) -> FAISS: | ||
| """ | ||
| Retrieves documents from a pickle file and creates a vector store. | ||
| Retrieve or build a vector store. | ||
|
|
||
| Returns: | ||
| FAISS: Vector store. | ||
| Args: | ||
| origin_preference: Preferred source (`pickle`, `web`, or `text`). | ||
| oringin_preference: Backward-compatible misspelled parameter name. | ||
| """ | ||
| selected_preference = oringin_preference or origin_preference | ||
|
|
||
| try: | ||
| if oringin_preference == cls.PICKLE: | ||
| vector_store = cls.get_doc_from_pickel() | ||
| print("Vector Store sucessufly loaded") | ||
|
|
||
| elif oringin_preference in [cls.WEB, cls.LOCAL]: | ||
| print("oringin_preference: ", oringin_preference) | ||
| vector_store = cls.create_db_critical(oringin_preference) | ||
| else: | ||
| raise ValueError("Invalid origin") | ||
|
|
||
| except Exception as e: | ||
| print(f"DataStore - get_vector_store() Error: {e}") | ||
| vector_store = cls.create_db_critical(cls.origin) | ||
|
|
||
| finally: | ||
| print("DataStore - Vector Store obtained:\n", vector_store) | ||
| if selected_preference == cls.PICKLE: | ||
| vector_store = cls.get_doc_from_pickle() | ||
| logger.info("Vector store loaded from pickle source") | ||
| elif selected_preference in [cls.WEB, cls.LOCAL]: | ||
| logger.info("Creating vector store from origin", extra={"origin": selected_preference}) | ||
| vector_store = cls.create_db_critical(selected_preference) | ||
| else: | ||
| raise ValueError(f"Invalid origin preference: {selected_preference}") | ||
|
|
||
| logger.info("Vector store obtained successfully") | ||
| return vector_store | ||
|
|
||
| except Exception: | ||
| logger.exception("Error obtaining vector store. Falling back to default origin") | ||
| return cls.create_db_critical(cls.origin) | ||
|
|
||
| @classmethod | ||
| @mt.debugger_exception_decorator | ||
| def get_doc_from_pickel(cls) -> FAISS: | ||
| def get_doc_from_pickle(cls) -> FAISS: | ||
| """ | ||
| Retrieves documents from a pickle file and creates a vector store. | ||
|
|
||
| Returns: | ||
| FAISS: Vector store. | ||
| Build a vector store from documents loaded via pickle cache. | ||
| """ | ||
| print("Checking docs_pickle_path...") | ||
|
|
||
| if os.path.exists(cls.docs_pickle_path): | ||
| with open(cls.docs_pickle_path, 'rb') as f: | ||
| docs = pickle.load(f) | ||
| print("Doc pickel file load from: ", cls.docs_pickle_path) | ||
|
|
||
| else: | ||
| docs = cls.prepare_doc_to_be_pickeled() | ||
| docs = cls.pickle_handler(docs) | ||
|
|
||
| print("Docs After Pickel: ", docs[2]) | ||
| embedding = OpenAIEmbeddings() | ||
| vector_store = FAISS.from_documents(docs, embedding=embedding) | ||
|
|
||
| return vector_store | ||
|
|
||
| docs = cls._load_or_create_pickled_docs() | ||
| embedding = cls._load_embeddings() | ||
| return cls._build_vector_store(docs, embedding) | ||
|
|
||
| @classmethod | ||
| # Backward-compatibility alias | ||
| def get_doc_from_pickel(cls) -> FAISS: | ||
| return cls.get_doc_from_pickle() | ||
|
|
||
| @classmethod | ||
| @mt.debugger_exception_decorator | ||
| def prepare_doc_to_be_pickeled(cls) -> list: | ||
|
||
| """ | ||
| Prepares documents to be pickled. | ||
|
|
||
| Returns: | ||
| list: List of prepared documents. | ||
| Prepare documents to be pickled. | ||
| """ | ||
| if cls.origin == cls.WEB: | ||
| loader = DocusaurusLoader(url=cls.SITE_URL) | ||
|
|
||
| if cls.origin == cls.LOCAL: | ||
| loader = TextLoader(file_path=cls.LOCAL_PATH, encoding='utf-8') | ||
| docs = cls._load_documents(cls.origin, chunk_size=750, chunk_overlap=30) | ||
| logger.info("Prepared documents for pickle", extra={"count": len(docs), "origin": cls.origin}) | ||
| return docs | ||
|
|
||
| docs = loader.load() | ||
| splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=30) | ||
| splitDocs = splitter.split_documents(docs) | ||
|
|
||
| # print("prepare_doc_to_be_pickeled() - splitDocs:\n", splitDocs[2]) | ||
| print("prepare_doc_to_be_pickeled() - splitDocs SIZE: ", len(splitDocs)) | ||
| return splitDocs | ||
|
|
||
| @classmethod | ||
| @mt.debugger_exception_decorator | ||
| def pickle_handler(cls, parameter: list) -> list: | ||
| """ | ||
| Handles pickling of documents. | ||
|
|
||
| Args: | ||
| parameter (list): List of documents. | ||
|
|
||
| Returns: | ||
| list: List of documents. | ||
| Persist and return documents in pickle format. | ||
| """ | ||
| pickle_path = Path(cls.docs_pickle_path) | ||
| pickle_path.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| # Check if the pickle file exists | ||
| if not os.path.exists(cls.docs_pickle_path): | ||
| # If the pickle file doesn't exist, create it with the parameter | ||
| with open(cls.docs_pickle_path, 'wb') as f: | ||
| pickle.dump(parameter, f) | ||
| print(f"The pickle file '{cls.docs_pickle_path}' was successfully created.") | ||
| if not pickle_path.exists(): | ||
| with pickle_path.open("wb") as file_handler: | ||
| pickle.dump(parameter, file_handler) | ||
| logger.info("Created pickle file", extra={"path": str(pickle_path)}) | ||
| else: | ||
| print(f"The pickle file '{cls.docs_pickle_path}' already exists.") | ||
|
|
||
| # Load the pickle file and return the variable | ||
| with open(cls.docs_pickle_path, 'rb') as f: | ||
| loaded_variable = pickle.load(f) | ||
| print(f"Variable loaded from the pickle file: '{cls.docs_pickle_path}'") | ||
| # print(f"Loaded variable: '{loaded_variable}'") | ||
| # print(f"Loaded variable str: '{loaded_variable[2]}'") | ||
| logger.info("Pickle file already exists", extra={"path": str(pickle_path)}) | ||
|
|
||
| with pickle_path.open("rb") as file_handler: | ||
| loaded_variable = pickle.load(file_handler) | ||
|
|
||
| logger.info("Loaded variable from pickle", extra={"path": str(pickle_path)}) | ||
| return loaded_variable | ||
|
|
||
| @classmethod | ||
| # @retry(wait=wait_random_exponential(multiplier=1, max=40), stop=stop_after_attempt(3)) | ||
| def create_db_critical(cls, origin: str = WEB) -> FAISS: | ||
| """ | ||
| Creates a vector store with extra context for critical mode. | ||
| Create a vector store from web or local documents. | ||
| """ | ||
| docs = cls._load_documents(origin, chunk_size=550, chunk_overlap=30) | ||
| embedding = cls._load_embeddings() | ||
| return cls._build_vector_store(docs, embedding) | ||
|
|
||
| Args: | ||
| origin (str): Data source origin. Defaults to LOCAL. | ||
| @classmethod | ||
| def _load_or_create_pickled_docs(cls) -> list: | ||
| pickle_path = Path(cls.docs_pickle_path) | ||
|
|
||
| Returns: | ||
| FAISS: Vector store. | ||
| """ | ||
| print("DataStore: create_db_critical() Starts: ", origin) | ||
|
|
||
| def get_documents_from(origin=origin): | ||
| if origin == cls.WEB: | ||
| loader = DocusaurusLoader( | ||
| url=cls.SITE_URL, | ||
| # filter_urls=[ | ||
| # "https://kobu.agency/case-studies" | ||
| # ], | ||
| # parsing_function=remove_nav_and_header_elements, | ||
| ) | ||
|
|
||
| if origin == cls.LOCAL: | ||
| file_path = cls.LOCAL_PATH # site_datas_light.txt or site_datas.txt (site inteiro) | ||
| loader = TextLoader(file_path=file_path, encoding='utf-8') | ||
|
|
||
| docs = loader.load() | ||
| splitter = RecursiveCharacterTextSplitter( | ||
| chunk_size=550, | ||
| chunk_overlap=30, # separators = ['<p>', '<br>','</p>', '\n'] | ||
| ) | ||
| splitDocs = splitter.split_documents(docs) | ||
| print("splitDocs", object) | ||
| print("splitDocs", splitDocs[8]) | ||
|
|
||
| return splitDocs | ||
|
|
||
| docs = get_documents_from(origin) | ||
| embedding = OpenAIEmbeddings() | ||
| print("openaiembeddings") | ||
| vector_store = FAISS.from_documents(docs, embedding=embedding) | ||
| # new = OpenAI | ||
| return vector_store | ||
| if pickle_path.exists(): | ||
| with pickle_path.open("rb") as file_handler: | ||
| docs = pickle.load(file_handler) | ||
|
Comment on lines
+134
to
+136
|
||
| logger.info("Loaded docs from pickle", extra={"path": str(pickle_path), "count": len(docs)}) | ||
| return docs | ||
|
|
||
| docs = cls.prepare_doc_to_be_pickeled() | ||
| return cls.pickle_handler(docs) | ||
|
|
||
| @classmethod | ||
| def _get_loader(cls, origin: str): | ||
| if origin == cls.WEB: | ||
| return DocusaurusLoader(url=cls.SITE_URL) | ||
|
|
||
| if origin == cls.LOCAL: | ||
| if not os.path.exists(cls.LOCAL_PATH): | ||
| raise FileNotFoundError(f"Local knowledge file not found: {cls.LOCAL_PATH}") | ||
| return TextLoader(file_path=cls.LOCAL_PATH, encoding="utf-8") | ||
|
|
||
| raise ValueError(f"Invalid origin: {origin}") | ||
|
|
||
| @classmethod | ||
| def _load_documents(cls, origin: str, chunk_size: int, chunk_overlap: int) -> list: | ||
| loader = cls._get_loader(origin) | ||
| docs = loader.load() | ||
| splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | ||
| split_docs = splitter.split_documents(docs) | ||
| logger.info( | ||
| "Loaded and split documents", | ||
| extra={ | ||
| "origin": origin, | ||
| "chunk_size": chunk_size, | ||
| "chunk_overlap": chunk_overlap, | ||
| "count": len(split_docs), | ||
| }, | ||
| ) | ||
| return split_docs | ||
|
|
||
| @staticmethod | ||
| def _load_embeddings() -> OpenAIEmbeddings: | ||
| logger.info("Loading embeddings") | ||
| return OpenAIEmbeddings() | ||
|
|
||
| @staticmethod | ||
| def _build_vector_store(docs: list, embedding: OpenAIEmbeddings) -> FAISS: | ||
| logger.info("Building vector store", extra={"count": len(docs)}) | ||
| return FAISS.from_documents(docs, embedding=embedding) | ||
|
Comment on lines
+131
to
+180
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling in this method has a potential issue. If an exception occurs (line 70-72), the fallback calls
cls.create_db_critical(cls.origin). However, ifcls.originis set toLOCAL(as it is by default on line 40) and the local file doesn't exist, this will raise aFileNotFoundErrorfrom the_get_loadermethod (line 150), which won't be caught and will propagate to the caller. Consider either catching and handling this specific case differently, or ensuring that when falling back, a known-working origin is used instead ofcls.origin.