From 46254fc49b84164e2eb1c7f28038c78ff9f8120b Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Sun, 23 May 2021 15:27:05 +0500 Subject: [PATCH 01/10] [document_stores] Duplicate document implmentation added for memorystore. --- haystack/document_store/base.py | 32 +++++++++++++++++++++++++- haystack/document_store/memory.py | 38 +++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 4658de2beb..64f2cc9883 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -19,9 +19,11 @@ class BaseDocumentStore(BaseComponent): index: Optional[str] label_index: Optional[str] similarity: Optional[str] + duplicate_documents_options: List[str] = ['skip', 'overwrite', 'fail'] @abstractmethod - def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -32,6 +34,11 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O It can be used for filtering and is accessible in the responses of the Finder. :param index: Optional name of index where the documents shall be written to. If None, the DocumentStore's default index (self.index) will be used. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: (Default option): Ignore the duplicates documents + overwrite: Overwrite the documents if exist + fail: Thrown exception if document exists. :return: None """ @@ -214,3 +221,26 @@ def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[s def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # type: ignore self.write_documents(documents=documents, index=index) return kwargs, "output_1" + + def drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: + """ + Drop duplicates documents based on same hash ID + + :param documents: A list of Haystack Document objects. + :return: A list of Haystack Document objects. + """ + _hash_ids: list = [] + _documents: List[Document] = [] + + for document in documents: + if document.id in _hash_ids: + continue + _documents.append(document) + _hash_ids.append(document.id) + + return _documents + + +class DuplicateDocumentError(Exception): + """Exception for Duplicate document""" + pass diff --git a/haystack/document_store/memory.py b/haystack/document_store/memory.py index 56962c014b..c88849bf42 100644 --- a/haystack/document_store/memory.py +++ b/haystack/document_store/memory.py @@ -10,7 +10,7 @@ from tqdm import tqdm from haystack import Document, Label -from haystack.document_store.base import BaseDocumentStore +from haystack.document_store.base import BaseDocumentStore, DuplicateDocumentError from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator @@ -31,6 +31,7 @@ def __init__( return_embedding: bool = False, similarity: str = "dot_product", progress_bar: bool = True, + duplicate_documents: str = 'skip', ): """ :param index: The documents are scoped to an index attribute that can be used when writing, querying, @@ -43,12 +44,18 @@ def __init__( more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ('skip','overwrite','fail') + skip: (Default option): Ignore the duplicates documents + overwrite: Overwrite the documents if exist + fail: Thrown exception if document exists. """ # save init parameters to enable export of component config as YAML self.set_config( - index=index, label_index=label_index, embedding_field=embedding_field, embedding_dim=embedding_dim, - return_embedding=return_embedding, similarity=similarity, progress_bar=progress_bar, + index=index, label_index=label_index, embedding_field=embedding_field, embedding_dim=embedding_dim, + return_embedding=return_embedding, similarity=similarity, progress_bar=progress_bar, + duplicate_documents=duplicate_documents, ) self.indexes: Dict[str, Dict] = defaultdict(dict) @@ -59,8 +66,10 @@ def __init__( self.return_embedding = return_embedding self.similarity = similarity self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents - def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -72,18 +81,33 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O It can be used for filtering and is accessible in the responses of the Finder. :param index: write documents to a custom namespace. For instance, documents for evaluation can be indexed in a separate index than the documents for search. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: (Default option): Ignore the duplicates documents + overwrite: Overwrite the documents if exist + fail: Thrown exception if document exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" field_map = self._create_document_field_map() documents = deepcopy(documents) - documents_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + documents_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in + documents] for document in documents_objects: if document.id in self.indexes[index]: - # TODO Make error type consistent across document stores and add user options to deal with duplicate documents (ignore, overwrite, fail) - raise ValueError(f"Duplicate Documents: write_documents() failed - Document with id '{document.id} already exists in index '{index}'") + if duplicate_documents == "fail": + raise DuplicateDocumentError(f"Document with id '{document.id} already " + f"exists in index '{index}'") + elif duplicate_documents == "skip": + logger.warning(f"Duplicate Documents: Document with id '{document.id} already exists in index " + f"'{index}'") + continue self.indexes[index][document.id] = document def _create_document_field_map(self): From b3999dea8575e0f3693142b4d8786b54f4d6fb2b Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Sun, 23 May 2021 16:05:17 +0500 Subject: [PATCH 02/10] [document_stores]duplicate documents implementation done for faiss store. --- haystack/document_store/base.py | 7 +++-- haystack/document_store/faiss.py | 47 +++++++++++++++++++++++-------- haystack/document_store/memory.py | 16 ++++++----- haystack/document_store/sql.py | 19 +++++++------ 4 files changed, 58 insertions(+), 31 deletions(-) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 64f2cc9883..90eb191b06 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -36,9 +36,10 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O If None, the DocumentStore's default index (self.index) will be used. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip: (Default option): Ignore the duplicates documents - overwrite: Overwrite the documents if exist - fail: Thrown exception if document exists. + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. :return: None """ diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index a3a63c7543..03cda24981 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -13,7 +13,7 @@ from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator from scipy.special import expit - +from haystack.document_store.base import DuplicateDocumentError logger = logging.getLogger(__name__) @@ -37,11 +37,11 @@ def __init__( faiss_index_factory_str: str = "Flat", faiss_index: Optional["faiss.swigfaiss.Index"] = None, return_embedding: bool = False, - update_existing_documents: bool = False, index: str = "document", similarity: str = "dot_product", embedding_field: str = "embedding", progress_bar: bool = True, + duplicate_documents: str = 'skip', **kwargs, ): """ @@ -66,23 +66,25 @@ def __init__( :param faiss_index: Pass an existing FAISS Index, i.e. an empty one that you configured manually or one with docs that you used in Haystack before and want to load again. :param return_embedding: To return document embedding - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. :param index: Name of index in document store to use. :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine it is more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param embedding_field: Name of field containing an embedding vector. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( sql_url=sql_url, vector_dim=vector_dim, faiss_index_factory_str=faiss_index_factory_str, faiss_index=faiss_index, return_embedding=return_embedding, - update_existing_documents=update_existing_documents, index=index, similarity=similarity, + duplicate_documents=duplicate_documents, index=index, similarity=similarity, embedding_field=embedding_field, progress_bar=progress_bar ) @@ -110,7 +112,7 @@ def __init__( super().__init__( url=sql_url, - update_existing_documents=update_existing_documents, + duplicate_documents=duplicate_documents, index=index ) @@ -130,9 +132,8 @@ def _create_new_index(self, vector_dim: int, metric_type, index_factory: str = " index = faiss.index_factory(vector_dim, index_factory, metric_type) return index - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Add new documents to the DocumentStore. @@ -140,10 +141,21 @@ def write_documents( them right away in FAISS. If not, you can later call update_embeddings() to create & index them. :param index: (SQL) index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + if not self.faiss_indexes.get(index): self.faiss_indexes[index] = self._create_new_index( vector_dim=self.vector_dim, @@ -153,10 +165,21 @@ def write_documents( field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self.drop_duplicate_documents(document_objects) + + if duplicate_documents in ('fail', 'skip'): + _documents = super(FAISSDocumentStore, self).get_documents_by_id(ids=[doc.id for doc in document_objects], + index=index) + _ids_exist_in_db = [doc.id for doc in _documents] + if duplicate_documents == "skip": + document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_db, document_objects)) + else: + raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_db)} already exists" + f" in Database.") add_vectors = False if document_objects[0].embedding is None else True - if self.update_existing_documents and add_vectors: + if self.duplicate_documents == "overwrite" and add_vectors: logger.warning("You have enabled `update_existing_documents` feature and " "`FAISSDocumentStore` does not support update in existing `faiss_index`.\n" "Please call `update_embeddings` method to repopulate `faiss_index`") diff --git a/haystack/document_store/memory.py b/haystack/document_store/memory.py index c88849bf42..6c3cdc22c5 100644 --- a/haystack/document_store/memory.py +++ b/haystack/document_store/memory.py @@ -45,10 +45,11 @@ def __init__( :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. :param duplicate_documents: Handle duplicates document based on parameter options. - Parameter options : ('skip','overwrite','fail') - skip: (Default option): Ignore the duplicates documents - overwrite: Overwrite the documents if exist - fail: Thrown exception if document exists. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML @@ -83,9 +84,10 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O separate index than the documents for search. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip: (Default option): Ignore the duplicates documents - overwrite: Overwrite the documents if exist - fail: Thrown exception if document exists. + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 27dc1caaf0..6b468adef5 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -73,7 +73,7 @@ def __init__( url: str = "sqlite://", index: str = "document", label_index: str = "label", - update_existing_documents: bool = False, + duplicate_documents: str = 'skip', ): """ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends. @@ -82,16 +82,17 @@ def __init__( :param index: The documents are scoped to an index attribute that can be used when writing, querying, or deleting documents. This parameter sets the default value for document index. :param label_index: The default value of index attribute for the labels. - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. Using this parameter could cause performance degradation - for document insertion. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( - url=url, index=index, label_index=label_index, update_existing_documents=update_existing_documents + url=url, index=index, label_index=label_index, duplicate_documents=duplicate_documents ) engine = create_engine(url) @@ -100,7 +101,7 @@ def __init__( self.session = Session() self.index: str = index self.label_index = label_index - self.update_existing_documents = update_existing_documents + self.duplicate_documents = duplicate_documents if getattr(self, "similarity", None) is None: self.similarity = None self.use_windowed_query = True @@ -299,7 +300,7 @@ def write_documents( vector_id = meta_fields.pop("vector_id", None) meta_orms = [MetaORM(name=key, value=value) for key, value in meta_fields.items()] doc_orm = DocumentORM(id=doc.id, text=doc.text, vector_id=vector_id, meta=meta_orms, index=index) - if self.update_existing_documents: + if self.duplicate_documents == "overwrite": # First old meta data cleaning is required self.session.query(MetaORM).filter_by(document_id=doc.id).delete() self.session.merge(doc_orm) From 6a204872a40ae95cbdc164c6468746fbeeb0e72c Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Sun, 23 May 2021 16:42:15 +0500 Subject: [PATCH 03/10] [document_store] Duplicate document feature added for elasticsearch document store fixed #1069 --- haystack/document_store/elasticsearch.py | 58 ++++++++++++++++-------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index 9f41aef0cc..ca068cee89 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -11,7 +11,7 @@ from scipy.special import expit from tqdm.auto import tqdm -from haystack.document_store.base import BaseDocumentStore +from haystack.document_store.base import BaseDocumentStore, DuplicateDocumentError from haystack import Document, Label from haystack.utils import get_batches_from_generator @@ -43,11 +43,11 @@ def __init__( ca_certs: Optional[str] = None, verify_certs: bool = True, create_index: bool = True, - update_existing_documents: bool = False, refresh_type: str = "wait_for", similarity="dot_product", timeout=30, return_embedding: bool = False, + duplicate_documents: str = 'skip', ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. @@ -80,11 +80,7 @@ def __init__( :param scheme: 'https' or 'http', protocol used to connect to your elasticsearch instance :param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk. You can use certifi package with certifi.where() to find where the CA certs file is located in your machine. :param verify_certs: Whether to be strict about ca certificates - :param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case) - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. + :param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case :param refresh_type: Type of ES refresh used to control when changes made by a request (e.g. bulk) are made visible to search. If set to 'wait_for', continue only after changes are visible (slow, but safe). If set to 'false', continue directly (fast, but sometimes unintuitive behaviour when docs are not immediately available after ingestion). @@ -93,6 +89,12 @@ def __init__( more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param timeout: Number of seconds after which an ElasticSearch request times out. :param return_embedding: To return document embedding + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML @@ -102,7 +104,7 @@ def __init__( name_field=name_field, embedding_field=embedding_field, embedding_dim=embedding_dim, custom_mapping=custom_mapping, excluded_meta_data=excluded_meta_data, analyzer=analyzer, scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs, create_index=create_index, - update_existing_documents=update_existing_documents, refresh_type=refresh_type, similarity=similarity, + duplicate_documents=duplicate_documents, refresh_type=refresh_type, similarity=similarity, timeout=timeout, return_embedding=return_embedding, ) @@ -137,7 +139,7 @@ def __init__( self._create_document_index(index) self._create_label_index(label_index) - self.update_existing_documents = update_existing_documents + self.duplicate_documents = duplicate_documents self.refresh_type = refresh_type def _init_elastic_client(self, @@ -349,9 +351,8 @@ def get_metadata_values_by_key( bucket["value"] = bucket.pop("key") return buckets - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000,duplicate_documents: Optional[str] = None): """ Indexes documents for later queries in Elasticsearch. @@ -371,6 +372,13 @@ def write_documents( should be changed to what you have set for self.text_field and self.name_field. :param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used. :param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ @@ -379,17 +387,27 @@ def write_documents( if index is None: index = self.index - - documents_to_index = [] - for document in documents: - # Make sure we comply to Document class format - if isinstance(document, dict): - doc = Document.from_dict(document, field_map=self._create_document_field_map()) + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + + field_map = self._create_document_field_map() + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self.drop_duplicate_documents(document_objects) + + if duplicate_documents in ('fail', 'skip'): + _documents = self.get_documents_by_id(ids=[doc.id for doc in document_objects], index=index) + _ids_exist_in_es = [doc.id for doc in _documents] + if duplicate_documents == "skip": + document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_es, document_objects)) else: - doc = document + raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_es)} already exists" + f" in elasticsearch index={index}.") + documents_to_index = [] + for doc in document_objects: _doc = { - "_op_type": "index" if self.update_existing_documents else "create", + "_op_type": "index" if self.duplicate_documents == 'overwrite' else "create", "_index": index, **doc.to_dict(field_map=self._create_document_field_map()) } # type: Dict[str, Any] From a2bf28f455b552503368e227e53d3b039e919f11 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Sun, 23 May 2021 17:37:41 +0500 Subject: [PATCH 04/10] [document_store] Duplicate documents feature added for milvus document store and bug fixed in faiss document store fixed #1069 --- haystack/document_store/faiss.py | 5 ++-- haystack/document_store/milvus.py | 48 ++++++++++++++++++++++--------- haystack/document_store/sql.py | 17 ++++++----- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index 03cda24981..5d68381abd 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -109,10 +109,10 @@ def __init__( raise ValueError("The FAISS document store can currently only support dot_product similarity. " "Please set similarity=\"dot_product\"") self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents super().__init__( url=sql_url, - duplicate_documents=duplicate_documents, index=index ) @@ -199,7 +199,8 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O vector_id += 1 docs_to_write_in_sql.append(doc) - super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index) + super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index, + duplicate_documents=duplicate_documents) def _create_document_field_map(self) -> Dict: return { diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index a40744b02b..e6b20275d8 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -12,6 +12,7 @@ from haystack.document_store.sql import SQLDocumentStore from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator +from haystack.document_store.base import DuplicateDocumentError logger = logging.getLogger(__name__) @@ -46,10 +47,10 @@ def __init__( index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, search_param: Optional[Dict[str, Any]] = None, - update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, + duplicate_documents: str = 'skip', **kwargs, ): """ @@ -85,21 +86,23 @@ def __init__( :param search_param: Configuration parameters for the chose index_type needed at query time For example: {"nprobe": 10} as the number of cluster units to query for index_type IVF_FLAT. See https://milvus.io/docs/v1.0.0/index.md - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. :param return_embedding: To return document embedding. :param embedding_field: Name of field containing an embedding vector. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( sql_url=sql_url, milvus_url=milvus_url, connection_pool=connection_pool, index=index, vector_dim=vector_dim, index_file_size=index_file_size, similarity=similarity, index_type=index_type, index_param=index_param, - search_param=search_param, update_existing_documents=update_existing_documents, + search_param=search_param, duplicate_documents=duplicate_documents, return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) @@ -122,10 +125,10 @@ def __init__( self.return_embedding = return_embedding self.embedding_field = embedding_field self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents super().__init__( url=sql_url, - update_existing_documents=update_existing_documents, index=index ) @@ -162,9 +165,8 @@ def _create_document_field_map(self) -> Dict: self.index: self.embedding_field, } - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Add new documents to the DocumentStore. @@ -172,9 +174,19 @@ def write_documents( them right away in Milvus. If not, you can later call update_embeddings() to create & index them. :param index: (SQL) index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" self._create_collection_and_index_if_not_exist(index) field_map = self._create_document_field_map() @@ -183,6 +195,16 @@ def write_documents( return document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self.drop_duplicate_documents(document_objects) + + if duplicate_documents in ('fail', 'skip'): + _documents = self.get_documents_by_id(ids=[doc.id for doc in document_objects], index=index) + _ids_exist_in_db = [doc.id for doc in _documents] + if duplicate_documents == "skip": + document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_db, document_objects)) + else: + raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_db)} already exists" + f" in Database.") add_vectors = False if document_objects[0].embedding is None else True @@ -203,7 +225,7 @@ def write_documents( raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' f'supported. Please use list or numpy.ndarray') - if self.update_existing_documents: + if duplicate_documents == 'overwrite': existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) @@ -218,12 +240,12 @@ def write_documents( meta["vector_id"] = vector_ids[idx] docs_to_write_in_sql.append(doc) - super().write_documents(docs_to_write_in_sql, index=index) + super().write_documents(docs_to_write_in_sql, index=index, duplicate_documents=duplicate_documents) progress_bar.update(batch_size) progress_bar.close() self.milvus_server.flush([index]) - if self.update_existing_documents: + if duplicate_documents == 'overwrite': self.milvus_server.compact(collection_name=index) def update_embeddings( diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 6b468adef5..9d9a12d7bd 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -73,7 +73,6 @@ def __init__( url: str = "sqlite://", index: str = "document", label_index: str = "label", - duplicate_documents: str = 'skip', ): """ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends. @@ -92,7 +91,7 @@ def __init__( # save init parameters to enable export of component config as YAML self.set_config( - url=url, index=index, label_index=label_index, duplicate_documents=duplicate_documents + url=url, index=index, label_index=label_index ) engine = create_engine(url) @@ -101,7 +100,6 @@ def __init__( self.session = Session() self.index: str = index self.label_index = label_index - self.duplicate_documents = duplicate_documents if getattr(self, "similarity", None) is None: self.similarity = None self.use_windowed_query = True @@ -267,9 +265,8 @@ def get_all_labels(self, index=None, filters: Optional[dict] = None): return labels - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -281,6 +278,12 @@ def write_documents( :param index: add an optional index attribute to documents. It can be later used for filtering. For instance, documents for evaluation can be indexed in a separate index than the documents for search. :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. :return: None """ @@ -300,7 +303,7 @@ def write_documents( vector_id = meta_fields.pop("vector_id", None) meta_orms = [MetaORM(name=key, value=value) for key, value in meta_fields.items()] doc_orm = DocumentORM(id=doc.id, text=doc.text, vector_id=vector_id, meta=meta_orms, index=index) - if self.duplicate_documents == "overwrite": + if duplicate_documents == "overwrite": # First old meta data cleaning is required self.session.query(MetaORM).filter_by(document_id=doc.id).delete() self.session.merge(doc_orm) From 07ad3b3e24798591ec24e234fe40b1c8605bdb88 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Sun, 23 May 2021 21:39:04 +0500 Subject: [PATCH 05/10] [document_store] Code refactored fixed #1069 --- haystack/document_store/base.py | 33 ++++++++++++++++++++++++ haystack/document_store/elasticsearch.py | 14 ++-------- haystack/document_store/faiss.py | 13 +--------- haystack/document_store/milvus.py | 12 +-------- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 90eb191b06..78e5a4be22 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -223,6 +223,11 @@ def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # self.write_documents(documents=documents, index=index) return kwargs, "output_1" + @abstractmethod + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, + batch_size: int = 10_000) -> List[Document]: + pass + def drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: """ Drop duplicates documents based on same hash ID @@ -235,12 +240,40 @@ def drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: for document in documents: if document.id in _hash_ids: + logger.warning(f"Duplicate Documents: Document with id '{document.id}' already exists in index " + f"'{self.index}'") continue _documents.append(document) _hash_ids.append(document.id) return _documents + def handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None): + """ + Handle duplicates documents + + :param documents: A list of Haystack Document objects. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :return: A list of Haystack Document objects. + """ + if duplicate_documents in ('skip', 'fail'): + documents = self.drop_duplicate_documents(documents) + documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=self.index) + ids_exist_in_db = [doc.id for doc in documents_found] + + if len(ids_exist_in_db) > 0 and duplicate_documents == 'fail': + raise DuplicateDocumentError(f"Document with ids '{', '.join(ids_exist_in_db)} already exists" + f" in index = '{self.index}'.") + + documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents)) + + return documents + class DuplicateDocumentError(Exception): """Exception for Duplicate document""" diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index ca068cee89..11d0cc2d90 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -393,21 +393,11 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.drop_duplicate_documents(document_objects) - - if duplicate_documents in ('fail', 'skip'): - _documents = self.get_documents_by_id(ids=[doc.id for doc in document_objects], index=index) - _ids_exist_in_es = [doc.id for doc in _documents] - if duplicate_documents == "skip": - document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_es, document_objects)) - else: - raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_es)} already exists" - f" in elasticsearch index={index}.") - + document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) documents_to_index = [] for doc in document_objects: _doc = { - "_op_type": "index" if self.duplicate_documents == 'overwrite' else "create", + "_op_type": "index" if duplicate_documents == 'overwrite' else "create", "_index": index, **doc.to_dict(field_map=self._create_document_field_map()) } # type: Dict[str, Any] diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index 5d68381abd..a506d94a0a 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -165,18 +165,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.drop_duplicate_documents(document_objects) - - if duplicate_documents in ('fail', 'skip'): - _documents = super(FAISSDocumentStore, self).get_documents_by_id(ids=[doc.id for doc in document_objects], - index=index) - _ids_exist_in_db = [doc.id for doc in _documents] - if duplicate_documents == "skip": - document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_db, document_objects)) - else: - raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_db)} already exists" - f" in Database.") - + document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True if self.duplicate_documents == "overwrite" and add_vectors: diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index e6b20275d8..640c2d30bf 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -195,17 +195,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O return document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.drop_duplicate_documents(document_objects) - - if duplicate_documents in ('fail', 'skip'): - _documents = self.get_documents_by_id(ids=[doc.id for doc in document_objects], index=index) - _ids_exist_in_db = [doc.id for doc in _documents] - if duplicate_documents == "skip": - document_objects = list(filter(lambda doc: doc.id not in _ids_exist_in_db, document_objects)) - else: - raise DuplicateDocumentError(f"Document with ids '{', '.join(_ids_exist_in_db)} already exists" - f" in Database.") - + document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True batched_documents = get_batches_from_generator(document_objects, batch_size) From 7083a6ebc1332b28694ae5c5b46c941ee7726ec5 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Mon, 24 May 2021 00:25:26 +0500 Subject: [PATCH 06/10] [document_store]Test cases refactored. --- haystack/document_store/sql.py | 6 +++++- test/test_document_store.py | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 9d9a12d7bd..599d54fbc0 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -73,6 +73,7 @@ def __init__( url: str = "sqlite://", index: str = "document", label_index: str = "label", + duplicate_documents: str = "skip" ): """ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends. @@ -91,7 +92,7 @@ def __init__( # save init parameters to enable export of component config as YAML self.set_config( - url=url, index=index, label_index=label_index + url=url, index=index, label_index=label_index, duplicate_documents=duplicate_documents ) engine = create_engine(url) @@ -100,6 +101,7 @@ def __init__( self.session = Session() self.index: str = index self.label_index = label_index + self.duplicate_documents = duplicate_documents if getattr(self, "similarity", None) is None: self.similarity = None self.use_windowed_query = True @@ -289,6 +291,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents if len(documents) == 0: return # Make sure we comply to Document class format @@ -297,6 +300,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O else: document_objects = documents + document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) for i in range(0, len(document_objects), batch_size): for doc in document_objects[i: i + batch_size]: meta_fields = doc.meta or {} diff --git a/test/test_document_store.py b/test/test_document_store.py index b1f93e08ea..fe8bda28c9 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -44,8 +44,9 @@ def test_write_with_duplicate_doc_ids(document_store): id_hash_keys=["key1"] ) ] + document_store.write_documents(documents, duplicate_documents="skip") with pytest.raises(Exception): - document_store.write_documents(documents) + document_store.write_documents(documents, duplicate_documents="fail") @pytest.mark.elasticsearch @@ -168,15 +169,14 @@ def test_update_existing_documents(document_store, update_existing_documents): {"text": "text1_new", "id": "1", "meta_field_for_count": "a"}, ] - document_store.update_existing_documents = update_existing_documents document_store.write_documents(original_docs) assert document_store.get_document_count() == 1 if update_existing_documents: - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="overwrite") else: with pytest.raises(Exception): - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="fail") stored_docs = document_store.get_all_documents() assert len(stored_docs) == 1 From f51e2c02ef6730a2bb1d63ddeddf2774d18b4560 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Mon, 24 May 2021 00:59:44 +0500 Subject: [PATCH 07/10] [document_store] mypy issue fixed. --- haystack/document_store/base.py | 3 ++- haystack/document_store/elasticsearch.py | 4 ++-- haystack/document_store/memory.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 78e5a4be22..366630dcf1 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -23,7 +23,7 @@ class BaseDocumentStore(BaseComponent): @abstractmethod def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, - duplicate_documents: Optional[str] = None): + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -34,6 +34,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O It can be used for filtering and is accessible in the responses of the Finder. :param index: Optional name of index where the documents shall be written to. If None, the DocumentStore's default index (self.index) will be used. + :param batch_size: Number of documents that are passed to bulk function at a time. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') skip (default option): Ignore the duplicates documents diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index 11d0cc2d90..944fab7154 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -305,7 +305,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D else: return None - def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: # type: ignore """Fetch documents by specifying a list of text id strings""" index = index or self.index query = {"query": {"ids": {"values": ids}}} @@ -458,7 +458,7 @@ def write_labels( label.updated_at = label.created_at _label = { - "_op_type": "index" if self.update_existing_documents else "create", + "_op_type": "index" if self.duplicate_documents == "overwrite" else "create", "_index": index, **label.to_dict() } # type: Dict[str, Any] diff --git a/haystack/document_store/memory.py b/haystack/document_store/memory.py index 6c3cdc22c5..5863101db8 100644 --- a/haystack/document_store/memory.py +++ b/haystack/document_store/memory.py @@ -69,7 +69,7 @@ def __init__( self.progress_bar = progress_bar self.duplicate_documents = duplicate_documents - def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, # type: ignore duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -140,7 +140,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D else: return None - def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: # type: ignore """Fetch documents by specifying a list of text id strings""" index = index or self.index documents = [self.indexes[index][id] for id in ids] From 4cad8fb1eccb0d8f1ab558fdeb1cbc7fdcfe4cc3 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Mon, 24 May 2021 01:14:58 +0500 Subject: [PATCH 08/10] [test_case] faiss and milvus test case refactored to support duplicate documents implementation. fixed #1069 --- test/test_faiss_and_milvus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_faiss_and_milvus.py b/test/test_faiss_and_milvus.py index 70b0c8d7da..16f11925fb 100644 --- a/test/test_faiss_and_milvus.py +++ b/test/test_faiss_and_milvus.py @@ -93,7 +93,7 @@ def test_update_docs(document_store, retriever, batch_size): @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) @pytest.mark.parametrize("document_store", ["milvus", "faiss"], indirect=True) def test_update_exiting_docs(document_store, retriever): - document_store.update_existing_documents = True + document_store.duplicate_documents = "overwrite" old_document = Document(text="text_1") # initial write document_store.write_documents([old_document]) From e90ad58cc527fdea38c581d7a0305fa36355fcb5 Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Mon, 24 May 2021 01:38:10 +0500 Subject: [PATCH 09/10] [document_store] duplicate_documents_options code refactored. --- haystack/document_store/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 366630dcf1..e2f9b8cd65 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -19,7 +19,7 @@ class BaseDocumentStore(BaseComponent): index: Optional[str] label_index: Optional[str] similarity: Optional[str] - duplicate_documents_options: List[str] = ['skip', 'overwrite', 'fail'] + duplicate_documents_options: tuple = ('skip', 'overwrite', 'fail') @abstractmethod def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, From e7710dd96f2a12e79b0f735137046228f3f4f56a Mon Sep 17 00:00:00 2001 From: Ikram Ali Date: Tue, 25 May 2021 14:47:38 +0500 Subject: [PATCH 10/10] [document_store] Code refactored. --- haystack/document_store/base.py | 14 +++++--------- haystack/document_store/elasticsearch.py | 8 ++++---- haystack/document_store/faiss.py | 10 +++++----- haystack/document_store/memory.py | 6 +++--- haystack/document_store/milvus.py | 8 ++++---- haystack/document_store/sql.py | 8 ++++---- haystack/errors.py | 7 +++++++ test/test_faiss_and_milvus.py | 2 +- 8 files changed, 33 insertions(+), 30 deletions(-) create mode 100644 haystack/errors.py diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index e2f9b8cd65..d2ebff85a7 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -6,6 +6,7 @@ import numpy as np from haystack import Document, Label, MultiLabel, BaseComponent +from haystack.errors import DuplicateDocumentError from haystack.preprocessor.preprocessor import PreProcessor from haystack.preprocessor.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl @@ -37,7 +38,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O :param batch_size: Number of documents that are passed to bulk function at a time. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -229,7 +230,7 @@ def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000) -> List[Document]: pass - def drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: + def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: """ Drop duplicates documents based on same hash ID @@ -249,7 +250,7 @@ def drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: return _documents - def handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None): + def _handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None): """ Handle duplicates documents @@ -263,7 +264,7 @@ def handle_duplicate_documents(self, documents: List[Document], duplicate_docume :return: A list of Haystack Document objects. """ if duplicate_documents in ('skip', 'fail'): - documents = self.drop_duplicate_documents(documents) + documents = self._drop_duplicate_documents(documents) documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=self.index) ids_exist_in_db = [doc.id for doc in documents_found] @@ -274,8 +275,3 @@ def handle_duplicate_documents(self, documents: List[Document], duplicate_docume documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents)) return documents - - -class DuplicateDocumentError(Exception): - """Exception for Duplicate document""" - pass diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index 944fab7154..72d68d16f9 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -47,7 +47,7 @@ def __init__( similarity="dot_product", timeout=30, return_embedding: bool = False, - duplicate_documents: str = 'skip', + duplicate_documents: str = 'overwrite', ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. @@ -91,7 +91,7 @@ def __init__( :param return_embedding: To return document embedding :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -374,7 +374,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O :param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -393,7 +393,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) documents_to_index = [] for doc in document_objects: _doc = { diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index a506d94a0a..8ee3e46687 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -41,7 +41,7 @@ def __init__( similarity: str = "dot_product", embedding_field: str = "embedding", progress_bar: bool = True, - duplicate_documents: str = 'skip', + duplicate_documents: str = 'overwrite', **kwargs, ): """ @@ -74,7 +74,7 @@ def __init__( Can be helpful to disable in production deployments to keep the logs clean. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -143,7 +143,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -165,11 +165,11 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True if self.duplicate_documents == "overwrite" and add_vectors: - logger.warning("You have enabled `update_existing_documents` feature and " + logger.warning("You have to provide `duplicate_documents = 'overwrite'` arg and " "`FAISSDocumentStore` does not support update in existing `faiss_index`.\n" "Please call `update_embeddings` method to repopulate `faiss_index`") diff --git a/haystack/document_store/memory.py b/haystack/document_store/memory.py index 5863101db8..e1ab1871da 100644 --- a/haystack/document_store/memory.py +++ b/haystack/document_store/memory.py @@ -31,7 +31,7 @@ def __init__( return_embedding: bool = False, similarity: str = "dot_product", progress_bar: bool = True, - duplicate_documents: str = 'skip', + duplicate_documents: str = 'overwrite', ): """ :param index: The documents are scoped to an index attribute that can be used when writing, querying, @@ -46,7 +46,7 @@ def __init__( Can be helpful to disable in production deployments to keep the logs clean. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -84,7 +84,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O separate index than the documents for search. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 640c2d30bf..bdb8c8b921 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -50,7 +50,7 @@ def __init__( return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, - duplicate_documents: str = 'skip', + duplicate_documents: str = 'overwrite', **kwargs, ): """ @@ -92,7 +92,7 @@ def __init__( Can be helpful to disable in production deployments to keep the logs clean. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -176,7 +176,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -195,7 +195,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O return document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True batched_documents = get_batches_from_generator(document_objects, batch_size) diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 599d54fbc0..2f9e063782 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -73,7 +73,7 @@ def __init__( url: str = "sqlite://", index: str = "document", label_index: str = "label", - duplicate_documents: str = "skip" + duplicate_documents: str = "overwrite" ): """ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends. @@ -84,7 +84,7 @@ def __init__( :param label_index: The default value of index attribute for the labels. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -282,7 +282,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :param duplicate_documents: Handle duplicates document based on parameter options. Parameter options : ( 'skip','overwrite','fail') - skip (default option): Ignore the duplicates documents + skip: Ignore the duplicates documents overwrite: Update any existing documents with the same ID when adding documents. fail: an error is raised if the document ID of the document being added already exists. @@ -300,7 +300,7 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O else: document_objects = documents - document_objects = self.handle_duplicate_documents(document_objects, duplicate_documents) + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) for i in range(0, len(document_objects), batch_size): for doc in document_objects[i: i + batch_size]: meta_fields = doc.meta or {} diff --git a/haystack/errors.py b/haystack/errors.py new file mode 100644 index 0000000000..a0181c91ae --- /dev/null +++ b/haystack/errors.py @@ -0,0 +1,7 @@ +# coding: utf8 +"""Custom Errors for Haystack stacks""" + + +class DuplicateDocumentError(ValueError): + """Exception for Duplicate document""" + pass diff --git a/test/test_faiss_and_milvus.py b/test/test_faiss_and_milvus.py index 16f11925fb..33bd4d4d14 100644 --- a/test/test_faiss_and_milvus.py +++ b/test/test_faiss_and_milvus.py @@ -92,7 +92,7 @@ def test_update_docs(document_store, retriever, batch_size): @pytest.mark.slow @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) @pytest.mark.parametrize("document_store", ["milvus", "faiss"], indirect=True) -def test_update_exiting_docs(document_store, retriever): +def test_update_existing_docs(document_store, retriever): document_store.duplicate_documents = "overwrite" old_document = Document(text="text_1") # initial write