-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Add options for handling duplicate documents (skip, fail, overwrite) #1088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46254fc
b3999de
6a20487
a2bf28f
07ad3b3
7083a6e
f51e2c0
4cad8fb
e90ad58
e7710dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| import numpy as np | ||
|
|
||
| from haystack import Document, Label, MultiLabel, BaseComponent | ||
| from haystack.errors import DuplicateDocumentError | ||
| from haystack.preprocessor.preprocessor import PreProcessor | ||
| from haystack.preprocessor.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl | ||
|
|
||
|
|
@@ -19,9 +20,11 @@ class BaseDocumentStore(BaseComponent): | |
| index: Optional[str] | ||
| label_index: Optional[str] | ||
| similarity: Optional[str] | ||
| duplicate_documents_options: tuple = ('skip', 'overwrite', 'fail') | ||
|
tholor marked this conversation as resolved.
|
||
|
|
||
| @abstractmethod | ||
| def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None): | ||
| def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, | ||
| batch_size: int = 10_000, duplicate_documents: Optional[str] = None): | ||
|
tholor marked this conversation as resolved.
|
||
| """ | ||
| Indexes documents for later queries. | ||
|
|
||
|
|
@@ -32,6 +35,13 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O | |
| It can be used for filtering and is accessible in the responses of the Finder. | ||
| :param index: Optional name of index where the documents shall be written to. | ||
| If None, the DocumentStore's default index (self.index) will be used. | ||
| :param batch_size: Number of documents that are passed to bulk function at a time. | ||
| :param duplicate_documents: Handle duplicates document based on parameter options. | ||
| Parameter options : ( 'skip','overwrite','fail') | ||
| skip: Ignore the duplicates documents | ||
| overwrite: Update any existing documents with the same ID when adding documents. | ||
| fail: an error is raised if the document ID of the document being added already | ||
| exists. | ||
|
|
||
| :return: None | ||
| """ | ||
|
|
@@ -214,3 +224,54 @@ def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[s | |
| def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # type: ignore | ||
| self.write_documents(documents=documents, index=index) | ||
| return kwargs, "output_1" | ||
|
|
||
| @abstractmethod | ||
| def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, | ||
|
lalitpagaria marked this conversation as resolved.
|
||
| batch_size: int = 10_000) -> List[Document]: | ||
| pass | ||
|
|
||
| def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: | ||
| """ | ||
| Drop duplicates documents based on same hash ID | ||
|
|
||
| :param documents: A list of Haystack Document objects. | ||
| :return: A list of Haystack Document objects. | ||
| """ | ||
| _hash_ids: list = [] | ||
| _documents: List[Document] = [] | ||
|
|
||
| for document in documents: | ||
| if document.id in _hash_ids: | ||
| logger.warning(f"Duplicate Documents: Document with id '{document.id}' already exists in index " | ||
| f"'{self.index}'") | ||
| continue | ||
| _documents.append(document) | ||
| _hash_ids.append(document.id) | ||
|
|
||
| return _documents | ||
|
|
||
| def _handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None): | ||
| """ | ||
| Handle duplicates documents | ||
|
|
||
| :param documents: A list of Haystack Document objects. | ||
| :param duplicate_documents: Handle duplicates document based on parameter options. | ||
| Parameter options : ( 'skip','overwrite','fail') | ||
| skip (default option): Ignore the duplicates documents | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if we want to make
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my view 'overwrite' will be good. User will get warning along with without any changes his latest/updated docs can be written to doc store.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume from the above comment, overwrite will be the default option.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think overwrite could be a good default value. I assume it'd overwrite the meta fields if they're different from the original? |
||
| overwrite: Update any existing documents with the same ID when adding documents. | ||
| fail: an error is raised if the document ID of the document being added already | ||
| exists. | ||
| :return: A list of Haystack Document objects. | ||
| """ | ||
| if duplicate_documents in ('skip', 'fail'): | ||
| documents = self._drop_duplicate_documents(documents) | ||
| documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=self.index) | ||
| ids_exist_in_db = [doc.id for doc in documents_found] | ||
|
|
||
| if len(ids_exist_in_db) > 0 and duplicate_documents == 'fail': | ||
| raise DuplicateDocumentError(f"Document with ids '{', '.join(ids_exist_in_db)} already exists" | ||
| f" in index = '{self.index}'.") | ||
|
|
||
| documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents)) | ||
|
|
||
| return documents | ||
Uh oh!
There was an error while loading. Please reload this page.