diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index 4658de2beb..d2ebff85a7 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -6,6 +6,7 @@ import numpy as np from haystack import Document, Label, MultiLabel, BaseComponent +from haystack.errors import DuplicateDocumentError from haystack.preprocessor.preprocessor import PreProcessor from haystack.preprocessor.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl @@ -19,9 +20,11 @@ class BaseDocumentStore(BaseComponent): index: Optional[str] label_index: Optional[str] similarity: Optional[str] + duplicate_documents_options: tuple = ('skip', 'overwrite', 'fail') @abstractmethod - def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -32,6 +35,13 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O It can be used for filtering and is accessible in the responses of the Finder. :param index: Optional name of index where the documents shall be written to. If None, the DocumentStore's default index (self.index) will be used. + :param batch_size: Number of documents that are passed to bulk function at a time. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. :return: None """ @@ -214,3 +224,54 @@ def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[s def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # type: ignore self.write_documents(documents=documents, index=index) return kwargs, "output_1" + + @abstractmethod + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, + batch_size: int = 10_000) -> List[Document]: + pass + + def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]: + """ + Drop duplicates documents based on same hash ID + + :param documents: A list of Haystack Document objects. + :return: A list of Haystack Document objects. + """ + _hash_ids: list = [] + _documents: List[Document] = [] + + for document in documents: + if document.id in _hash_ids: + logger.warning(f"Duplicate Documents: Document with id '{document.id}' already exists in index " + f"'{self.index}'") + continue + _documents.append(document) + _hash_ids.append(document.id) + + return _documents + + def _handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None): + """ + Handle duplicates documents + + :param documents: A list of Haystack Document objects. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip (default option): Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :return: A list of Haystack Document objects. + """ + if duplicate_documents in ('skip', 'fail'): + documents = self._drop_duplicate_documents(documents) + documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=self.index) + ids_exist_in_db = [doc.id for doc in documents_found] + + if len(ids_exist_in_db) > 0 and duplicate_documents == 'fail': + raise DuplicateDocumentError(f"Document with ids '{', '.join(ids_exist_in_db)} already exists" + f" in index = '{self.index}'.") + + documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents)) + + return documents diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index 9f41aef0cc..72d68d16f9 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -11,7 +11,7 @@ from scipy.special import expit from tqdm.auto import tqdm -from haystack.document_store.base import BaseDocumentStore +from haystack.document_store.base import BaseDocumentStore, DuplicateDocumentError from haystack import Document, Label from haystack.utils import get_batches_from_generator @@ -43,11 +43,11 @@ def __init__( ca_certs: Optional[str] = None, verify_certs: bool = True, create_index: bool = True, - update_existing_documents: bool = False, refresh_type: str = "wait_for", similarity="dot_product", timeout=30, return_embedding: bool = False, + duplicate_documents: str = 'overwrite', ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. @@ -80,11 +80,7 @@ def __init__( :param scheme: 'https' or 'http', protocol used to connect to your elasticsearch instance :param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk. You can use certifi package with certifi.where() to find where the CA certs file is located in your machine. :param verify_certs: Whether to be strict about ca certificates - :param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case) - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. + :param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case :param refresh_type: Type of ES refresh used to control when changes made by a request (e.g. bulk) are made visible to search. If set to 'wait_for', continue only after changes are visible (slow, but safe). If set to 'false', continue directly (fast, but sometimes unintuitive behaviour when docs are not immediately available after ingestion). @@ -93,6 +89,12 @@ def __init__( more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param timeout: Number of seconds after which an ElasticSearch request times out. :param return_embedding: To return document embedding + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML @@ -102,7 +104,7 @@ def __init__( name_field=name_field, embedding_field=embedding_field, embedding_dim=embedding_dim, custom_mapping=custom_mapping, excluded_meta_data=excluded_meta_data, analyzer=analyzer, scheme=scheme, ca_certs=ca_certs, verify_certs=verify_certs, create_index=create_index, - update_existing_documents=update_existing_documents, refresh_type=refresh_type, similarity=similarity, + duplicate_documents=duplicate_documents, refresh_type=refresh_type, similarity=similarity, timeout=timeout, return_embedding=return_embedding, ) @@ -137,7 +139,7 @@ def __init__( self._create_document_index(index) self._create_label_index(label_index) - self.update_existing_documents = update_existing_documents + self.duplicate_documents = duplicate_documents self.refresh_type = refresh_type def _init_elastic_client(self, @@ -303,7 +305,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D else: return None - def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: # type: ignore """Fetch documents by specifying a list of text id strings""" index = index or self.index query = {"query": {"ids": {"values": ids}}} @@ -349,9 +351,8 @@ def get_metadata_values_by_key( bucket["value"] = bucket.pop("key") return buckets - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000,duplicate_documents: Optional[str] = None): """ Indexes documents for later queries in Elasticsearch. @@ -371,6 +372,13 @@ def write_documents( should be changed to what you have set for self.text_field and self.name_field. :param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used. :param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ @@ -379,17 +387,17 @@ def write_documents( if index is None: index = self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + field_map = self._create_document_field_map() + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) documents_to_index = [] - for document in documents: - # Make sure we comply to Document class format - if isinstance(document, dict): - doc = Document.from_dict(document, field_map=self._create_document_field_map()) - else: - doc = document - + for doc in document_objects: _doc = { - "_op_type": "index" if self.update_existing_documents else "create", + "_op_type": "index" if duplicate_documents == 'overwrite' else "create", "_index": index, **doc.to_dict(field_map=self._create_document_field_map()) } # type: Dict[str, Any] @@ -450,7 +458,7 @@ def write_labels( label.updated_at = label.created_at _label = { - "_op_type": "index" if self.update_existing_documents else "create", + "_op_type": "index" if self.duplicate_documents == "overwrite" else "create", "_index": index, **label.to_dict() } # type: Dict[str, Any] diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index a3a63c7543..8ee3e46687 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -13,7 +13,7 @@ from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator from scipy.special import expit - +from haystack.document_store.base import DuplicateDocumentError logger = logging.getLogger(__name__) @@ -37,11 +37,11 @@ def __init__( faiss_index_factory_str: str = "Flat", faiss_index: Optional["faiss.swigfaiss.Index"] = None, return_embedding: bool = False, - update_existing_documents: bool = False, index: str = "document", similarity: str = "dot_product", embedding_field: str = "embedding", progress_bar: bool = True, + duplicate_documents: str = 'overwrite', **kwargs, ): """ @@ -66,23 +66,25 @@ def __init__( :param faiss_index: Pass an existing FAISS Index, i.e. an empty one that you configured manually or one with docs that you used in Haystack before and want to load again. :param return_embedding: To return document embedding - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. :param index: Name of index in document store to use. :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine it is more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param embedding_field: Name of field containing an embedding vector. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( sql_url=sql_url, vector_dim=vector_dim, faiss_index_factory_str=faiss_index_factory_str, faiss_index=faiss_index, return_embedding=return_embedding, - update_existing_documents=update_existing_documents, index=index, similarity=similarity, + duplicate_documents=duplicate_documents, index=index, similarity=similarity, embedding_field=embedding_field, progress_bar=progress_bar ) @@ -107,10 +109,10 @@ def __init__( raise ValueError("The FAISS document store can currently only support dot_product similarity. " "Please set similarity=\"dot_product\"") self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents super().__init__( url=sql_url, - update_existing_documents=update_existing_documents, index=index ) @@ -130,9 +132,8 @@ def _create_new_index(self, vector_dim: int, metric_type, index_factory: str = " index = faiss.index_factory(vector_dim, index_factory, metric_type) return index - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Add new documents to the DocumentStore. @@ -140,10 +141,21 @@ def write_documents( them right away in FAISS. If not, you can later call update_embeddings() to create & index them. :param index: (SQL) index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + if not self.faiss_indexes.get(index): self.faiss_indexes[index] = self._create_new_index( vector_dim=self.vector_dim, @@ -153,11 +165,11 @@ def write_documents( field_map = self._create_document_field_map() document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True - if self.update_existing_documents and add_vectors: - logger.warning("You have enabled `update_existing_documents` feature and " + if self.duplicate_documents == "overwrite" and add_vectors: + logger.warning("You have to provide `duplicate_documents = 'overwrite'` arg and " "`FAISSDocumentStore` does not support update in existing `faiss_index`.\n" "Please call `update_embeddings` method to repopulate `faiss_index`") @@ -176,7 +188,8 @@ def write_documents( vector_id += 1 docs_to_write_in_sql.append(doc) - super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index) + super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index, + duplicate_documents=duplicate_documents) def _create_document_field_map(self) -> Dict: return { diff --git a/haystack/document_store/memory.py b/haystack/document_store/memory.py index 56962c014b..e1ab1871da 100644 --- a/haystack/document_store/memory.py +++ b/haystack/document_store/memory.py @@ -10,7 +10,7 @@ from tqdm import tqdm from haystack import Document, Label -from haystack.document_store.base import BaseDocumentStore +from haystack.document_store.base import BaseDocumentStore, DuplicateDocumentError from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator @@ -31,6 +31,7 @@ def __init__( return_embedding: bool = False, similarity: str = "dot_product", progress_bar: bool = True, + duplicate_documents: str = 'overwrite', ): """ :param index: The documents are scoped to an index attribute that can be used when writing, querying, @@ -43,12 +44,19 @@ def __init__( more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( - index=index, label_index=label_index, embedding_field=embedding_field, embedding_dim=embedding_dim, - return_embedding=return_embedding, similarity=similarity, progress_bar=progress_bar, + index=index, label_index=label_index, embedding_field=embedding_field, embedding_dim=embedding_dim, + return_embedding=return_embedding, similarity=similarity, progress_bar=progress_bar, + duplicate_documents=duplicate_documents, ) self.indexes: Dict[str, Dict] = defaultdict(dict) @@ -59,8 +67,10 @@ def __init__( self.return_embedding = return_embedding self.similarity = similarity self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents - def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, # type: ignore + duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -72,18 +82,34 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O It can be used for filtering and is accessible in the responses of the Finder. :param index: write documents to a custom namespace. For instance, documents for evaluation can be indexed in a separate index than the documents for search. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: None """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" field_map = self._create_document_field_map() documents = deepcopy(documents) - documents_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + documents_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in + documents] for document in documents_objects: if document.id in self.indexes[index]: - # TODO Make error type consistent across document stores and add user options to deal with duplicate documents (ignore, overwrite, fail) - raise ValueError(f"Duplicate Documents: write_documents() failed - Document with id '{document.id} already exists in index '{index}'") + if duplicate_documents == "fail": + raise DuplicateDocumentError(f"Document with id '{document.id} already " + f"exists in index '{index}'") + elif duplicate_documents == "skip": + logger.warning(f"Duplicate Documents: Document with id '{document.id} already exists in index " + f"'{index}'") + continue self.indexes[index][document.id] = document def _create_document_field_map(self): @@ -114,7 +140,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D else: return None - def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: # type: ignore """Fetch documents by specifying a list of text id strings""" index = index or self.index documents = [self.indexes[index][id] for id in ids] diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index a40744b02b..bdb8c8b921 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -12,6 +12,7 @@ from haystack.document_store.sql import SQLDocumentStore from haystack.retriever.base import BaseRetriever from haystack.utils import get_batches_from_generator +from haystack.document_store.base import DuplicateDocumentError logger = logging.getLogger(__name__) @@ -46,10 +47,10 @@ def __init__( index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, search_param: Optional[Dict[str, Any]] = None, - update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, + duplicate_documents: str = 'overwrite', **kwargs, ): """ @@ -85,21 +86,23 @@ def __init__( :param search_param: Configuration parameters for the chose index_type needed at query time For example: {"nprobe": 10} as the number of cluster units to query for index_type IVF_FLAT. See https://milvus.io/docs/v1.0.0/index.md - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. :param return_embedding: To return document embedding. :param embedding_field: Name of field containing an embedding vector. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( sql_url=sql_url, milvus_url=milvus_url, connection_pool=connection_pool, index=index, vector_dim=vector_dim, index_file_size=index_file_size, similarity=similarity, index_type=index_type, index_param=index_param, - search_param=search_param, update_existing_documents=update_existing_documents, + search_param=search_param, duplicate_documents=duplicate_documents, return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) @@ -122,10 +125,10 @@ def __init__( self.return_embedding = return_embedding self.embedding_field = embedding_field self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents super().__init__( url=sql_url, - update_existing_documents=update_existing_documents, index=index ) @@ -162,9 +165,8 @@ def _create_document_field_map(self) -> Dict: self.index: self.embedding_field, } - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Add new documents to the DocumentStore. @@ -172,9 +174,19 @@ def write_documents( them right away in Milvus. If not, you can later call update_embeddings() to create & index them. :param index: (SQL) index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document :return: """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" self._create_collection_and_index_if_not_exist(index) field_map = self._create_document_field_map() @@ -183,7 +195,7 @@ def write_documents( return document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] - + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) add_vectors = False if document_objects[0].embedding is None else True batched_documents = get_batches_from_generator(document_objects, batch_size) @@ -203,7 +215,7 @@ def write_documents( raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' f'supported. Please use list or numpy.ndarray') - if self.update_existing_documents: + if duplicate_documents == 'overwrite': existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) @@ -218,12 +230,12 @@ def write_documents( meta["vector_id"] = vector_ids[idx] docs_to_write_in_sql.append(doc) - super().write_documents(docs_to_write_in_sql, index=index) + super().write_documents(docs_to_write_in_sql, index=index, duplicate_documents=duplicate_documents) progress_bar.update(batch_size) progress_bar.close() self.milvus_server.flush([index]) - if self.update_existing_documents: + if duplicate_documents == 'overwrite': self.milvus_server.compact(collection_name=index) def update_embeddings( diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 27dc1caaf0..2f9e063782 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -73,7 +73,7 @@ def __init__( url: str = "sqlite://", index: str = "document", label_index: str = "label", - update_existing_documents: bool = False, + duplicate_documents: str = "overwrite" ): """ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends. @@ -82,16 +82,17 @@ def __init__( :param index: The documents are scoped to an index attribute that can be used when writing, querying, or deleting documents. This parameter sets the default value for document index. :param label_index: The default value of index attribute for the labels. - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. Using this parameter could cause performance degradation - for document insertion. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. """ # save init parameters to enable export of component config as YAML self.set_config( - url=url, index=index, label_index=label_index, update_existing_documents=update_existing_documents + url=url, index=index, label_index=label_index, duplicate_documents=duplicate_documents ) engine = create_engine(url) @@ -100,7 +101,7 @@ def __init__( self.session = Session() self.index: str = index self.label_index = label_index - self.update_existing_documents = update_existing_documents + self.duplicate_documents = duplicate_documents if getattr(self, "similarity", None) is None: self.similarity = None self.use_windowed_query = True @@ -266,9 +267,8 @@ def get_all_labels(self, index=None, filters: Optional[dict] = None): return labels - def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Indexes documents for later queries. @@ -280,11 +280,18 @@ def write_documents( :param index: add an optional index attribute to documents. It can be later used for filtering. For instance, documents for evaluation can be indexed in a separate index than the documents for search. :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. :return: None """ index = index or self.index + duplicate_documents = duplicate_documents or self.duplicate_documents if len(documents) == 0: return # Make sure we comply to Document class format @@ -293,13 +300,14 @@ def write_documents( else: document_objects = documents + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) for i in range(0, len(document_objects), batch_size): for doc in document_objects[i: i + batch_size]: meta_fields = doc.meta or {} vector_id = meta_fields.pop("vector_id", None) meta_orms = [MetaORM(name=key, value=value) for key, value in meta_fields.items()] doc_orm = DocumentORM(id=doc.id, text=doc.text, vector_id=vector_id, meta=meta_orms, index=index) - if self.update_existing_documents: + if duplicate_documents == "overwrite": # First old meta data cleaning is required self.session.query(MetaORM).filter_by(document_id=doc.id).delete() self.session.merge(doc_orm) diff --git a/haystack/errors.py b/haystack/errors.py new file mode 100644 index 0000000000..a0181c91ae --- /dev/null +++ b/haystack/errors.py @@ -0,0 +1,7 @@ +# coding: utf8 +"""Custom Errors for Haystack stacks""" + + +class DuplicateDocumentError(ValueError): + """Exception for Duplicate document""" + pass diff --git a/test/test_document_store.py b/test/test_document_store.py index b1f93e08ea..fe8bda28c9 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -44,8 +44,9 @@ def test_write_with_duplicate_doc_ids(document_store): id_hash_keys=["key1"] ) ] + document_store.write_documents(documents, duplicate_documents="skip") with pytest.raises(Exception): - document_store.write_documents(documents) + document_store.write_documents(documents, duplicate_documents="fail") @pytest.mark.elasticsearch @@ -168,15 +169,14 @@ def test_update_existing_documents(document_store, update_existing_documents): {"text": "text1_new", "id": "1", "meta_field_for_count": "a"}, ] - document_store.update_existing_documents = update_existing_documents document_store.write_documents(original_docs) assert document_store.get_document_count() == 1 if update_existing_documents: - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="overwrite") else: with pytest.raises(Exception): - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="fail") stored_docs = document_store.get_all_documents() assert len(stored_docs) == 1 diff --git a/test/test_faiss_and_milvus.py b/test/test_faiss_and_milvus.py index 70b0c8d7da..33bd4d4d14 100644 --- a/test/test_faiss_and_milvus.py +++ b/test/test_faiss_and_milvus.py @@ -92,8 +92,8 @@ def test_update_docs(document_store, retriever, batch_size): @pytest.mark.slow @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) @pytest.mark.parametrize("document_store", ["milvus", "faiss"], indirect=True) -def test_update_exiting_docs(document_store, retriever): - document_store.update_existing_documents = True +def test_update_existing_docs(document_store, retriever): + document_store.duplicate_documents = "overwrite" old_document = Document(text="text_1") # initial write document_store.write_documents([old_document])