From b432bec18f01a8b4afe124c52c8abd6d19db085a Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sun, 21 Feb 2021 19:06:30 +0530 Subject: [PATCH 01/22] Annotation Tool: data is not persisted when using local version #853 --- annotation_tool/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/annotation_tool/docker-compose.yml b/annotation_tool/docker-compose.yml index 7b88a9490e..9620f1dd5b 100644 --- a/annotation_tool/docker-compose.yml +++ b/annotation_tool/docker-compose.yml @@ -31,7 +31,7 @@ services: ports: - "5432:5432" volumes: - - ./postgres-data:/var/lib/psql/data + - ./postgres-data:/var/lib/postgresql/data networks: - app-network restart: unless-stopped From 70a3fa9927ae6b447e309808a6334195a6b54745 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sat, 15 May 2021 20:48:58 +0530 Subject: [PATCH 02/22] First version of weaviate --- haystack/document_store/weaviate.py | 584 ++++++++++++++++++++++++++++ test/conftest.py | 42 +- test/test_weaviate.py | 0 3 files changed, 624 insertions(+), 2 deletions(-) create mode 100644 haystack/document_store/weaviate.py create mode 100644 test/test_weaviate.py diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py new file mode 100644 index 0000000000..e1f0c1e1e0 --- /dev/null +++ b/haystack/document_store/weaviate.py @@ -0,0 +1,584 @@ +import logging +from typing import Any, Dict, Generator, List, Optional, Union +import numpy as np +from tqdm import tqdm + +from haystack import Document +from haystack.document_store.base import BaseDocumentStore +from haystack.utils import get_batches_from_generator + +from weaviate import client, auth, AuthClientPassword +from weaviate import ObjectsBatchRequest + +logger = logging.getLogger(__name__) + +class WeaviateDocumentStore(BaseDocumentStore): + """ + Milvus (https://milvus.io/) is a highly reliable, scalable Document Store specialized on storing and processing vectors. + Therefore, it is particularly suited for Haystack users that work with dense retrieval methods (like DPR). + In contrast to FAISS, Milvus ... + - runs as a separate service (e.g. a Docker container) and can scale easily in a distributed environment + - allows dynamic data management (i.e. you can insert/delete vectors without recreating the whole index) + - encapsulates multiple ANN libraries (FAISS, ANNOY ...) + + This class uses Milvus for all vector related storage, processing and querying. + The meta-data (e.g. for filtering) and the document text are however stored in a separate SQL Database as Milvus + does not allow these data types (yet). + + Usage: + 1. Start a Weaviate server (see https://www.semi.technology/developers/weaviate/current/getting-started/installation.html) + 2. Init a WeaviateDocumentStore in Haystack + """ + + def __init__( + self, + weaviate_url: str = "http://localhost:8080", + timeout_config: tuple = (5, 15), + username: str = None, + password: str = None, + index: str = "Document", + vector_dim: int = 768, + text_field: str = "text", + name_field: str = "name", + faq_question_field = "question", + similarity: str = "dot_product", + index_type: str = "hnsw", + custom_schema: Optional[dict] = None, + module_name: str = "text2vec-transformers", + index_param: Optional[Dict[str, Any]] = None, + search_param: Optional[Dict[str, Any]] = None, + update_existing_documents: bool = False, + return_embedding: bool = False, + embedding_field: str = "embedding", + progress_bar: bool = True, + **kwargs, + ): + """ + :param sql_url: SQL connection URL for storing document texts and metadata. It defaults to a local, file based SQLite DB. For large scale + deployment, Postgres is recommended. If using MySQL then same server can also be used for + Milvus metadata. For more details see https://milvus.io/docs/v1.0.0/data_manage.md. + :param milvus_url: Milvus server connection URL for storing and processing vectors. + Protocol, host and port will automatically be inferred from the URL. + See https://milvus.io/docs/v1.0.0/install_milvus.md for instructions to start a Milvus instance. + :param connection_pool: Connection pool type to connect with Milvus server. Default: "SingletonThread". + :param index: Index name for text, embedding and metadata (in Milvus terms, this is the "collection name"). + :param vector_dim: The embedding vector size. Default: 768. + :param index_file_size: Specifies the size of each segment file that is stored by Milvus and its default value is 1024 MB. + When the size of newly inserted vectors reaches the specified volume, Milvus packs these vectors into a new segment. + Milvus creates one index file for each segment. When conducting a vector search, Milvus searches all index files one by one. + As a rule of thumb, we would see a 30% ~ 50% increase in the search performance after changing the value of index_file_size from 1024 to 2048. + Note that an overly large index_file_size value may cause failure to load a segment into the memory or graphics memory. + (From https://milvus.io/docs/v1.0.0/performance_faq.md#How-can-I-get-the-best-performance-from-Milvus-through-setting-index_file_size) + :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default and recommended for DPR embeddings. + 'cosine' is recommended for Sentence Transformers, but is not directly supported by Milvus. + However, you can normalize your embeddings and use `dot_product` to get the same results. + See https://milvus.io/docs/v1.0.0/metric.md?Inner-product-(IP)#floating. + :param index_type: Type of approximate nearest neighbour (ANN) index used. The choice here determines your tradeoff between speed and accuracy. + Some popular options: + - FLAT (default): Exact method, slow + - IVF_FLAT, inverted file based heuristic, fast + - HSNW: Graph based, fast + - ANNOY: Tree based, fast + See: https://milvus.io/docs/v1.0.0/index.md + :param index_param: Configuration parameters for the chose index_type needed at indexing time. + For example: {"nlist": 16384} as the number of cluster units to create for index_type IVF_FLAT. + See https://milvus.io/docs/v1.0.0/index.md + :param search_param: Configuration parameters for the chose index_type needed at query time + For example: {"nprobe": 10} as the number of cluster units to query for index_type IVF_FLAT. + See https://milvus.io/docs/v1.0.0/index.md + :param update_existing_documents: Whether to update any existing documents with the same ID when adding + documents. When set as True, any document with an existing ID gets updated. + If set to False, an error is raised if the document ID of the document being + added already exists. + :param return_embedding: To return document embedding. + :param embedding_field: Name of field containing an embedding vector. + :param progress_bar: Whether to show a tqdm progress bar or not. + Can be helpful to disable in production deployments to keep the logs clean. + """ + + # save init parameters to enable export of component config as YAML + self.set_config( + weaviate_url=weaviate_url, timeout_config=timeout_config, username=username, password=password, + index=index, index_type=index_type, custom_schema=custom_schema, module_name=module_name, + vector_dim=vector_dim, text_field=text_field, name_field=name_field, faq_question_field=faq_question_field, + similarity=similarity, index_param=index_param,search_param=search_param, update_existing_documents=update_existing_documents, + return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, + ) + + credentials = auth.AuthClientPassword(username, password) + + if username and password: + secret = AuthClientPassword(username, password) + self.weaviate_client = client.Client(url=weaviate_url, + auth_client_secret=secret, + timeout_config=timeout_config) + else: + self.weaviate_client = client.Client(url=weaviate_url, + timeout_config=timeout_config) + + # Test connection + try: + status = self.weaviate_client.is_ready() + if not status: + raise ConnectionError( + f"Initial connection to Weaviate failed. Make sure you run Weaviate instance " + f"at `{weaviate_url}` and that it has finished the initial ramp up (can take > 30s)." + ) + except Exception: + raise ConnectionError( + f"Initial connection to Weaviate failed. Make sure you run Weaviate instance " + f"at `{weaviate_url}` and that it has finished the initial ramp up (can take > 30s)." + ) + + self.vector_dim = vector_dim + self.text_field = text_field + self.name_field = name_field + self.faq_question_field = faq_question_field + self.index_type = index_type + self.custom_schema = custom_schema + self.module_name = module_name + self.index = index + self.index_param = index_param or {"nlist": 16384} + self.search_param = search_param or {"nprobe": 10} + self.return_embedding = return_embedding + self.embedding_field = embedding_field + self.progress_bar = progress_bar + + self._create_schema_and_index_if_not_exist(self.index) + + #def __del__(self): + #return self.milvus_server.close() + + def _create_schema_and_index_if_not_exist( + self, + index: Optional[str] = None, + index_param: Optional[Dict[str, Any]] = None + ): + index = index or self.index + index_param = index_param or self.index_param + + if self.custom_schema: + schema = self.custom_schema + else: + schema = { + "classes": [ + { + "class": index, + "description": "Haystack default class", + "invertedIndexConfig": { + "cleanupIntervalSeconds": 60 + }, + "moduleConfig": { + "text2vec-transformers": { + "poolingStrategy": "masked_mean", + "vectorizeClassName": False + } + }, + "properties": [ + { + "dataType": [ + "string" + ], + "description": "Name Field", + "moduleConfig": { + "text2vec-transformers": { + "skip": False, + "vectorizePropertyName": False + } + }, + "name": self.name_field + }, + { + "dataType": [ + "text" + ], + "description": "Document Text", + "moduleConfig": { + "text2vec-transformers": { + "skip": True, + "vectorizePropertyName": True + } + }, + "name": self.text_field + }, + { + "dataType": [ + "string" + ], + "description": "Document meta", + "moduleConfig": { + "text2vec-transformers": { + "skip": False, + "vectorizePropertyName": False + } + }, + "name": "meta" + } + ], + "vectorIndexConfig": { + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "vectorCacheMaxObjects": 500000 + }, + "vectorIndexType": "hnsw", + "vectorizer": "text2vec-transformers" + } + ] + } + if not self.weaviate_client.schema.contains(schema): + self.weaviate_client.schema.create(schema) + else: + logger.warning(f"Schema already exists in Weaviate {schema}") + + def _convert_weaviate_result_to_document( + self, + result: dict, + return_embedding: bool + ) -> Document: + import ast + # By default, the result json will have the following fields + id = result.get("id") + embedding = result.get("vector") + + # Weaviate Get method returns the data items in properties key, + # Weaviate query doesn't have a properties key. + props = result.get("properties") + if not props: + props = result + + text = props.get(self.text_field) + question = props.get(self.faq_question_field) + + # We put all additional data of the doc into meta_data and return it in the API + meta_data = {k:v for k,v in props.items() if k not in (self.text_field, self.faq_question_field, self.embedding_field)} + name = meta_data.pop(self.name_field, None) + if name: + meta_data["name"] = name + + score = result.get("_additional").get('certainty') if result.get("_additional").get('certainty') else None + if score: + probability = score + else: + probability = None + + id = result.get("_additional").get('id') if result.get("_additional").get('id') else None + + if return_embedding: + if not embedding: + embedding = result.get("_additional").get("vector") + if embedding: + embedding = np.asarray(embedding, dtype=np.float32) + + document = Document( + id=id, + text=text, + meta=meta_data, + score=score, + probability=probability, + question=question, + embedding=embedding, + ) + return document + + def _create_document_field_map(self) -> Dict: + return { + self.text_field: "text", + self.embedding_field: "embedding", + self.faq_question_field if self.faq_question_field else "question": "question" + } + + def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[Document]: + """Fetch a document by specifying its text id string""" + '''{'class': 'Document', + 'creationTimeUnix': 1621075584724, + 'id': '1bad51b7-bd77-485d-8871-21c50fab248f', + 'properties': {'meta': "{'key1':'value1'}", + 'name': 'name_5', + 'text': 'text_5'}, + 'vector': []}''' + index = index or self.index + result = self.weaviate_client.data_object.get_by_id(id, with_vector=True) + document = self._convert_weaviate_result_to_document(result) + return document + + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + """Fetch documents by specifying a list of text id strings""" + index = index or self.index + docs = [] + for id in ids: + docs.append(self.get_document_by_id(id)) + return docs + + def write_documents( + self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 + ): + """ + Add new documents to the DocumentStore. + + :param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index + them right away in Milvus. If not, you can later call update_embeddings() to create & index them. + :param index: (SQL) index name for storing the docs and metadata + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :return: + """ + index = index or self.index + self._create_schema_and_index_if_not_exist(index) + field_map = self._create_document_field_map() + + if len(documents) == 0: + logger.warning("Calling DocumentStore.write_documents() with empty list") + return + + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + + batched_documents = get_batches_from_generator(document_objects, batch_size) + with tqdm(total=len(document_objects), disable=not self.progress_bar) as progress_bar: + for document_batch in batched_documents: + docs_batch = ObjectsBatchRequest() + for idx, doc in enumerate(document_batch): + vector = None + _ = doc.pop("score", None) + _ = doc.pop("probability", None) + if "meta" in doc.keys(): + doc["meta"] = str(doc.get("meta")) + doc_id = str(doc.pop("id")) + if doc[self.embedding_field] is not None: + if type(doc[self.embedding_field]) == np.ndarray: + vector = doc.pop[self.embedding_field].tolist() + vector = doc.pop(self.embedding_field) + docs_batch.add(doc, class_name=self.index, uuid=doc_id, vector=vector) + + self.weaviate_client.batch.create(docs_batch) + progress_bar.update(batch_size) + progress_bar.close() + + def update_document_meta(self, id: str, meta: Dict[str, str]): + """ + Update the metadata dictionary of a document by specifying its string id + """ + body = {"meta": meta} + self.weaviate_client.data_object.update(body, class_name=self.index, uuid=id) + + def get_document_count(self, filters: Optional[Dict[str, List[str]]] = None, index: Optional[str] = None) -> int: + """ + Return the number of documents in the document store. + """ + pass + + def get_all_documents( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, List[str]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 10_000, + ) -> List[Document]: + """ + Get documents from the document store. + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param filters: Optional filters to narrow down the documents to return. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + """ + result = self.get_all_documents_generator( + index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size + ) + documents = list(result) + return documents + + def _get_all_documents_in_index( + self, + index: str, + filters: Optional[Dict[str, List[str]]] = None, + batch_size: int = 10_000, + only_documents_without_embedding: bool = False, + ) -> Generator[dict, None, None]: + """ + Return all documents in a specific index in the document store + """ + where_filter = { + "operator": "And", + "operands": [] + } + + if filters: + operands = [] + for key, values in filters.items(): + operands.append( + { + "path": [key], + "operator": "equal", + "valueString": [values] + } + ) + where_filter = { + "operator": "And", + "operands": [operands] + } + + if only_documents_without_embedding: + raise OSError() + + result = self.weaviate_client.query.get(class_name=self.index, properties=[])\ + .with_where(where_filter)\ + .with_limit(batch_size)\ + .do() + yield from result + + def get_all_documents_generator( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, List[str]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 10_000, + ) -> Generator[Document, None, None]: + """ + Get documents from the document store. Under-the-hood, documents are fetched in batches from the + document store and yielded as individual documents. This method can be used to iteratively process + a large number of documents without having to load all documents in memory. + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param filters: Optional filters to narrow down the documents to return. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + """ + + if index is None: + index = self.index + + if return_embedding is None: + return_embedding = self.return_embedding + + results = self._get_all_documents_in_index(index=index, filters=filters, batch_size=batch_size) + for result in results: + document = self._convert_weaviate_result_to_document(result, return_embedding=return_embedding) + yield document + + def query( + self, + query: Optional[str], + filters: Optional[Dict[str, List[str]]] = None, + top_k: int = 10, + custom_query: Optional[str] = None, + index: Optional[str] = None, + ) -> List[Document]: + """ + Scan through documents in DocumentStore and return a small number documents + that are most relevant to the query as defined by the BM25 algorithm. + + :param query: The query + :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field + :param top_k: How many documents to return per query. + :param index: The name of the index in the DocumentStore from which to retrieve documents + """ + + if filters: + logger.warning("Query filters are not implemented for the WeaviateDocumentStore.") + + index = index or self.index + + if custom_query: + query_output = self.weaviate_client.query.raw(custom_query) + else: + query_string = { + "concepts" : [query] + } + query_output = self.weaviate_client.query\ + .get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ + .with_near_text(query_string)\ + .with_limit(top_k)\ + .do() + + results = query_output.get("data").get("Get").get(self.index) + documents = [] + for result in results: + doc = self._convert_weaviate_result_to_document(result) + documents.append(doc) + + return documents + + def query_by_embedding(self, + query_emb: np.ndarray, + filters: Optional[dict] = None, + top_k: int = 10, + index: Optional[str] = None, + return_embedding: Optional[bool] = None) -> List[Document]: + """ + Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. + + :param query_emb: Embedding of the query (e.g. gathered from DPR) + :param filters: Optional filters to narrow down the search space. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param top_k: How many documents to return + :param index: (SQL) index name for storing the docs and metadata + :param return_embedding: To return document embedding + :return: + """ + if filters: + logger.warning("Query filters are not implemented for the WeaviateDocumentStore.") + + if return_embedding is None: + return_embedding = self.return_embedding + index = index or self.index + + query_emb = query_emb.reshape(1, -1).astype(np.float32) + query_string = { + "vector" : query_emb + } + query_output = self.weaviate_client.query\ + .get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ + .with_near_vector(query_string)\ + .with_limit(top_k)\ + .do() + + results = query_output.get("data").get("Get").get(self.index) + documents = [] + for result in results: + doc = self._convert_weaviate_result_to_document(result) + documents.append(doc) + + return documents + + def update_embeddings( + self, + retriever, + index: Optional[str] = None, + filters: Optional[Dict[str, List[str]]] = None, + update_existing_embeddings: bool = True, + batch_size: int = 10_000 + ): + """ + Updates the embeddings in the the document store using the encoding model specified in the retriever. + This can be useful if want to add or change the embeddings for your documents (e.g. after changing the retriever config). + + :param retriever: Retriever to use to update the embeddings. + :param index: Index name to update + :param update_existing_embeddings: Whether to update existing embeddings of the documents. If set to False, + only documents without embeddings are processed. This mode can be used for + incremental updating of embeddings, wherein, only newly indexed documents + get processed. + :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :return: None + """ + raise RuntimeError("Weaviate store produces embeddings by default based on the configuration in " + "schema. Update embeddings isn't implemented for this store!") + + + def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): + """ + Delete all documents (from SQL AND Milvus). + :param index: (SQL) index name for storing the docs and metadata + :param filters: Optional filters to narrow down the search space. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :return: None + """ + index = index or self.index + self.weaviate_client.schema.delete_class(index) + diff --git a/test/conftest.py b/test/conftest.py index 42aac7e064..9876bde522 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,6 +8,7 @@ from elasticsearch import Elasticsearch from haystack.knowledge_graph.graphdb import GraphDBKnowledgeGraph from milvus import Milvus +import weaviate from haystack.document_store.milvus import MilvusDocumentStore from haystack.generator.transformers import RAGenerator, RAGeneratorType @@ -18,6 +19,7 @@ from haystack import Document from haystack.document_store.elasticsearch import ElasticsearchDocumentStore +from haystack.document_store.weaviate import WeaviateDocumentStore from haystack.document_store.faiss import FAISSDocumentStore from haystack.document_store.memory import InMemoryDocumentStore from haystack.document_store.sql import SQLDocumentStore @@ -99,6 +101,37 @@ def milvus_fixture(): time.sleep(40) +@pytest.fixture(scope="session") +def weaviate_fixture(): + # test if a Milvus server is already running. If not, start Milvus docker container locally. + # Make sure you have given > 6GB memory to docker engine + try: + milvus_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) + milvus_server.is_ready() + except: + print("Starting Weaviate ...") + status = subprocess.run( + ['docker rm haystack_test_weaviate'], + shell=True + ) + status = subprocess.run( + ['docker rm haystack_test_weaviate_transformers'], + shell=True + ) + status = subprocess.run( + ['docker run -d --name haystack_test_weaviate -p 8080:8080 semitechnologies/weaviate'], + shell=True + ) + status = subprocess.run( + ['docker run -d --name haystack_test_weaviate_transformers semitechnologies/transformers-inference'], + shell=True + ) + if status.returncode: + raise Exception( + "Failed to launch Weaviate. Please check docker container logs.") + time.sleep(60) + + @pytest.fixture(scope="session") def graphdb_fixture(): # test if a GraphDB instance is already running. If not, download and start a GraphDB instance locally. @@ -311,13 +344,12 @@ def document_store_with_docs(request, test_docs_xs): document_store.delete_all_documents() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store document_store.delete_all_documents() - def get_document_store(document_store_type, embedding_field="embedding"): if document_store_type == "sql": document_store = SQLDocumentStore(url="sqlite://", index="haystack_test") @@ -352,6 +384,12 @@ def get_document_store(document_store_type, embedding_field="embedding"): if collection.startswith("haystack_test"): document_store.milvus_server.drop_collection(collection) return document_store + elif document_store_type == "weaviate": + document_store = WeaviateDocumentStore( + weaviate_url="http://localhost:8080", + index="Haystack_test" + ) + document_store.weaviate_client.schema.delete_class("Haystack_test") else: raise Exception(f"No document store fixture for '{document_store_type}'") diff --git a/test/test_weaviate.py b/test/test_weaviate.py new file mode 100644 index 0000000000..e69de29bb2 From ee835db6f3a612b6f028c254c985f3e446225c59 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sat, 15 May 2021 20:54:11 +0530 Subject: [PATCH 03/22] First version of weaviate --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index 9876bde522..a295ea3084 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -109,7 +109,7 @@ def weaviate_fixture(): milvus_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) milvus_server.is_ready() except: - print("Starting Weaviate ...") + print("Starting Weaviate servers ...") status = subprocess.run( ['docker rm haystack_test_weaviate'], shell=True From 30cda6a951086320ef8470e198b121001aef028a Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sat, 15 May 2021 22:31:46 +0530 Subject: [PATCH 04/22] First version of weaviate --- haystack/document_store/weaviate.py | 72 ++++++++++++++++++----------- test/conftest.py | 13 +++--- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index e1f0c1e1e0..b17a498166 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -105,8 +105,6 @@ def __init__( return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) - credentials = auth.AuthClientPassword(username, password) - if username and password: secret = AuthClientPassword(username, password) self.weaviate_client = client.Client(url=weaviate_url, @@ -188,6 +186,19 @@ def _create_schema_and_index_if_not_exist( }, "name": self.name_field }, + { + "dataType": [ + "string" + ], + "description": "Question Field", + "moduleConfig": { + "text2vec-transformers": { + "skip": False, + "vectorizePropertyName": False + } + }, + "name": self.faq_question_field + }, { "dataType": [ "text" @@ -240,6 +251,8 @@ def _convert_weaviate_result_to_document( # By default, the result json will have the following fields id = result.get("id") embedding = result.get("vector") + score = None + probability = None # Weaviate Get method returns the data items in properties key, # Weaviate query doesn't have a properties key. @@ -256,13 +269,12 @@ def _convert_weaviate_result_to_document( if name: meta_data["name"] = name - score = result.get("_additional").get('certainty') if result.get("_additional").get('certainty') else None - if score: - probability = score - else: - probability = None + if result.get("_additional"): + score = result.get("_additional").get('certainty') if result.get("_additional").get('certainty') else None + if score: + probability = score - id = result.get("_additional").get('id') if result.get("_additional").get('id') else None + id = result.get("_additional").get('id') if result.get("_additional").get('id') else None if return_embedding: if not embedding: @@ -299,7 +311,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D 'vector': []}''' index = index or self.index result = self.weaviate_client.data_object.get_by_id(id, with_vector=True) - document = self._convert_weaviate_result_to_document(result) + document = self._convert_weaviate_result_to_document(result, return_embedding=True) return document def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: @@ -337,19 +349,26 @@ def write_documents( for document_batch in batched_documents: docs_batch = ObjectsBatchRequest() for idx, doc in enumerate(document_batch): - vector = None - _ = doc.pop("score", None) - _ = doc.pop("probability", None) - if "meta" in doc.keys(): - doc["meta"] = str(doc.get("meta")) - doc_id = str(doc.pop("id")) - if doc[self.embedding_field] is not None: - if type(doc[self.embedding_field]) == np.ndarray: - vector = doc.pop[self.embedding_field].tolist() - vector = doc.pop(self.embedding_field) - docs_batch.add(doc, class_name=self.index, uuid=doc_id, vector=vector) - - self.weaviate_client.batch.create(docs_batch) + _doc = { + **doc.to_dict(field_map=self._create_document_field_map()) + } + _ = _doc.pop("score", None) + _ = _doc.pop("probability", None) + if "meta" in _doc.keys(): + _doc["meta"] = str(_doc.get("meta")) + doc_id = str(_doc.pop("id")) + vector = _doc.pop(self.embedding_field) + if _doc.get(self.faq_question_field) is None: + _doc.pop(self.faq_question_field) + if vector: + docs_batch.add(_doc, class_name=self.index, uuid=doc_id, vector=vector) + else: + docs_batch.add(_doc, class_name=self.index, uuid=doc_id) + + outputs = self.weaviate_client.batch.create(docs_batch) + for output in outputs: + if output.get('result').get('errors'): + print(output.get('result').get('errors')) progress_bar.update(batch_size) progress_bar.close() @@ -422,11 +441,10 @@ def _get_all_documents_in_index( if only_documents_without_embedding: raise OSError() - result = self.weaviate_client.query.get(class_name=self.index, properties=[])\ - .with_where(where_filter)\ + result = self.weaviate_client.query.get(class_name=self.index, properties=[self.text_field,"_additional {id, certainty}"])\ .with_limit(batch_size)\ .do() - yield from result + yield from result.get("data").get("Get").get(self.index) def get_all_documents_generator( self, @@ -497,7 +515,7 @@ def query( results = query_output.get("data").get("Get").get(self.index) documents = [] for result in results: - doc = self._convert_weaviate_result_to_document(result) + doc = self._convert_weaviate_result_to_document(result, return_embedding=True) documents.append(doc) return documents @@ -539,7 +557,7 @@ def query_by_embedding(self, results = query_output.get("data").get("Get").get(self.index) documents = [] for result in results: - doc = self._convert_weaviate_result_to_document(result) + doc = self._convert_weaviate_result_to_document(result, return_embedding=True) documents.append(doc) return documents diff --git a/test/conftest.py b/test/conftest.py index a295ea3084..9d5becdddb 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -106,8 +106,8 @@ def weaviate_fixture(): # test if a Milvus server is already running. If not, start Milvus docker container locally. # Make sure you have given > 6GB memory to docker engine try: - milvus_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) - milvus_server.is_ready() + weaviate_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) + weaviate_server.is_ready() except: print("Starting Weaviate servers ...") status = subprocess.run( @@ -336,7 +336,7 @@ def get_retriever(retriever_type, document_store): return retriever -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) +@pytest.fixture(params=["weaviate"]) def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) @@ -344,7 +344,7 @@ def document_store_with_docs(request, test_docs_xs): document_store.delete_all_documents() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) +@pytest.fixture(params=["weaviate"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store @@ -387,9 +387,10 @@ def get_document_store(document_store_type, embedding_field="embedding"): elif document_store_type == "weaviate": document_store = WeaviateDocumentStore( weaviate_url="http://localhost:8080", - index="Haystack_test" + index="Haystacktest" ) - document_store.weaviate_client.schema.delete_class("Haystack_test") + document_store.weaviate_client.schema.delete_class("Haystacktest") + return document_store else: raise Exception(f"No document store fixture for '{document_store_type}'") From c10a54e63c511135fd1d31f32f49a5f26fbd6e4a Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sun, 16 May 2021 09:39:15 +0530 Subject: [PATCH 05/22] Updated comments --- haystack/document_store/weaviate.py | 158 ++++++++++++---------------- test/conftest.py | 47 +-------- 2 files changed, 69 insertions(+), 136 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index b17a498166..1195c92dfa 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -14,16 +14,17 @@ class WeaviateDocumentStore(BaseDocumentStore): """ - Milvus (https://milvus.io/) is a highly reliable, scalable Document Store specialized on storing and processing vectors. - Therefore, it is particularly suited for Haystack users that work with dense retrieval methods (like DPR). - In contrast to FAISS, Milvus ... - - runs as a separate service (e.g. a Docker container) and can scale easily in a distributed environment - - allows dynamic data management (i.e. you can insert/delete vectors without recreating the whole index) - - encapsulates multiple ANN libraries (FAISS, ANNOY ...) + https://www.semi.technology/developers/weaviate/current/index.html#what-is-weaviate + Weaviate is a cloud-native, modular, real-time vector search engine built to scale your machine learning models. + 1. Highly efficient large scale vector searches + 2. State of the Art classification techniques + 3. Easily retrieve your data in graph format using GraphQL - This class uses Milvus for all vector related storage, processing and querying. - The meta-data (e.g. for filtering) and the document text are however stored in a separate SQL Database as Milvus - does not allow these data types (yet). + Weaviate is a document store to store, query the documents for search + Both the document and embedding (generated by Weaviate) are stored in weaviate. + + Weaviate python client is used to connect to the server, more details are here + https://weaviate-python-client.readthedocs.io/en/docs/weaviate.html Usage: 1. Start a Weaviate server (see https://www.semi.technology/developers/weaviate/current/getting-started/installation.html) @@ -45,8 +46,6 @@ def __init__( index_type: str = "hnsw", custom_schema: Optional[dict] = None, module_name: str = "text2vec-transformers", - index_param: Optional[Dict[str, Any]] = None, - search_param: Optional[Dict[str, Any]] = None, update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", @@ -54,38 +53,25 @@ def __init__( **kwargs, ): """ - :param sql_url: SQL connection URL for storing document texts and metadata. It defaults to a local, file based SQLite DB. For large scale - deployment, Postgres is recommended. If using MySQL then same server can also be used for - Milvus metadata. For more details see https://milvus.io/docs/v1.0.0/data_manage.md. - :param milvus_url: Milvus server connection URL for storing and processing vectors. - Protocol, host and port will automatically be inferred from the URL. - See https://milvus.io/docs/v1.0.0/install_milvus.md for instructions to start a Milvus instance. - :param connection_pool: Connection pool type to connect with Milvus server. Default: "SingletonThread". - :param index: Index name for text, embedding and metadata (in Milvus terms, this is the "collection name"). + :param weaviate_url: Weaviate server connection URL for storing and processing documents and vectors. + For more details, refer "https://www.semi.technology/developers/weaviate/current/getting-started/installation.html" + :param timeout_config: Weaviate Timeout config as a tuple of (retries, time out seconds). + :param username: username (standard authentication via http_auth) + :param password: password (standard authentication via http_auth) + :param index: Index name for document text, embedding and metadata (in Weaviate terminology, this is a "Class" in Weaviate schema). :param vector_dim: The embedding vector size. Default: 768. - :param index_file_size: Specifies the size of each segment file that is stored by Milvus and its default value is 1024 MB. - When the size of newly inserted vectors reaches the specified volume, Milvus packs these vectors into a new segment. - Milvus creates one index file for each segment. When conducting a vector search, Milvus searches all index files one by one. - As a rule of thumb, we would see a 30% ~ 50% increase in the search performance after changing the value of index_file_size from 1024 to 2048. - Note that an overly large index_file_size value may cause failure to load a segment into the memory or graphics memory. - (From https://milvus.io/docs/v1.0.0/performance_faq.md#How-can-I-get-the-best-performance-from-Milvus-through-setting-index_file_size) - :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default and recommended for DPR embeddings. - 'cosine' is recommended for Sentence Transformers, but is not directly supported by Milvus. - However, you can normalize your embeddings and use `dot_product` to get the same results. - See https://milvus.io/docs/v1.0.0/metric.md?Inner-product-(IP)#floating. - :param index_type: Type of approximate nearest neighbour (ANN) index used. The choice here determines your tradeoff between speed and accuracy. - Some popular options: - - FLAT (default): Exact method, slow - - IVF_FLAT, inverted file based heuristic, fast - - HSNW: Graph based, fast - - ANNOY: Tree based, fast - See: https://milvus.io/docs/v1.0.0/index.md - :param index_param: Configuration parameters for the chose index_type needed at indexing time. - For example: {"nlist": 16384} as the number of cluster units to create for index_type IVF_FLAT. - See https://milvus.io/docs/v1.0.0/index.md - :param search_param: Configuration parameters for the chose index_type needed at query time - For example: {"nprobe": 10} as the number of cluster units to query for index_type IVF_FLAT. - See https://milvus.io/docs/v1.0.0/index.md + :param text_field: Name of field that might contain the answer and will therefore be passed to the Reader Model (e.g. "full_text"). + If no Reader is used (e.g. in FAQ-Style QA) the plain content of this field will just be returned. + :param name_field: Name of field that contains the title of the the doc + :param faq_question_field: Name of field containing the question in case of FAQ-Style QA + :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default. + :param index_type: Index type of any vector object defined in weaviate schema. The vector index type is pluggable. + Currently, HSNW is only supported. + See: https://www.semi.technology/developers/weaviate/current/more-resources/performance.html + :param custom_schema: Allows to create custom schema in Weaviate, for more details + See https://www.semi.technology/developers/weaviate/current/data-schema/schema-configuration.html + :param module_name : Vectorization module to convert data into vectors. Default is "text2vec-trasnformers" + For more details, See https://www.semi.technology/developers/weaviate/current/modules/ :param update_existing_documents: Whether to update any existing documents with the same ID when adding documents. When set as True, any document with an existing ID gets updated. If set to False, an error is raised if the document ID of the document being @@ -99,12 +85,13 @@ def __init__( # save init parameters to enable export of component config as YAML self.set_config( weaviate_url=weaviate_url, timeout_config=timeout_config, username=username, password=password, - index=index, index_type=index_type, custom_schema=custom_schema, module_name=module_name, - vector_dim=vector_dim, text_field=text_field, name_field=name_field, faq_question_field=faq_question_field, - similarity=similarity, index_param=index_param,search_param=search_param, update_existing_documents=update_existing_documents, + index=index, vector_dim=vector_dim, text_field=text_field, name_field=name_field, + faq_question_field=faq_question_field, similarity=similarity, index_type=index_type, + custom_schema=custom_schema, module_name=module_name, update_existing_documents=update_existing_documents, return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) + # Connect to Weaviate server using python binding if username and password: secret = AuthClientPassword(username, password) self.weaviate_client = client.Client(url=weaviate_url, @@ -114,7 +101,7 @@ def __init__( self.weaviate_client = client.Client(url=weaviate_url, timeout_config=timeout_config) - # Test connection + # Test Weaviate connection try: status = self.weaviate_client.is_ready() if not status: @@ -127,17 +114,16 @@ def __init__( f"Initial connection to Weaviate failed. Make sure you run Weaviate instance " f"at `{weaviate_url}` and that it has finished the initial ramp up (can take > 30s)." ) - + self.index = index self.vector_dim = vector_dim self.text_field = text_field self.name_field = name_field self.faq_question_field = faq_question_field + self.similarity = similarity self.index_type = index_type self.custom_schema = custom_schema self.module_name = module_name - self.index = index - self.index_param = index_param or {"nlist": 16384} - self.search_param = search_param or {"nprobe": 10} + self.update_existing_documents = update_existing_documents self.return_embedding = return_embedding self.embedding_field = embedding_field self.progress_bar = progress_bar @@ -150,10 +136,9 @@ def __init__( def _create_schema_and_index_if_not_exist( self, index: Optional[str] = None, - index_param: Optional[Dict[str, Any]] = None ): + """Create a new index(schema/class in Weaviate)for storing documents in case if an index (schema) with the name doesn't exist already.""" index = index or self.index - index_param = index_param or self.index_param if self.custom_schema: schema = self.custom_schema @@ -232,30 +217,32 @@ def _create_schema_and_index_if_not_exist( "efConstruction": 128, "vectorCacheMaxObjects": 500000 }, - "vectorIndexType": "hnsw", - "vectorizer": "text2vec-transformers" + "vectorIndexType": self.index_type, + "vectorizer": self.module_name } ] } if not self.weaviate_client.schema.contains(schema): self.weaviate_client.schema.create(schema) - else: - logger.warning(f"Schema already exists in Weaviate {schema}") def _convert_weaviate_result_to_document( self, result: dict, return_embedding: bool ) -> Document: - import ast - # By default, the result json will have the following fields - id = result.get("id") - embedding = result.get("vector") + """ + Convert weaviate result dict into haystack document object. This is more involved because + weaviate search result dict varies between get and query interfaces. + Weaviate get methods return the data items in properties key, whereas the query doesn't. + """ score = None probability = None - # Weaviate Get method returns the data items in properties key, - # Weaviate query doesn't have a properties key. + id = result.get("id") + embedding = result.get("vector") + + # If properties key is present, the fields are in this key. + # otherwise, a direct lookup in result dict props = result.get("properties") if not props: props = result @@ -269,6 +256,7 @@ def _convert_weaviate_result_to_document( if name: meta_data["name"] = name + # Weaviate creates "_additional" key for semantic search if result.get("_additional"): score = result.get("_additional").get('certainty') if result.get("_additional").get('certainty') else None if score: @@ -302,6 +290,7 @@ def _create_document_field_map(self) -> Dict: def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[Document]: """Fetch a document by specifying its text id string""" + # Sample result dict from a get method '''{'class': 'Document', 'creationTimeUnix': 1621075584724, 'id': '1bad51b7-bd77-485d-8871-21c50fab248f', @@ -329,8 +318,8 @@ def write_documents( Add new documents to the DocumentStore. :param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index - them right away in Milvus. If not, you can later call update_embeddings() to create & index them. - :param index: (SQL) index name for storing the docs and metadata + them right away in Weaviate. If not, the embeddings are automatically generated by Weaviate. + :param index: index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :return: """ @@ -383,6 +372,7 @@ def get_document_count(self, filters: Optional[Dict[str, List[str]]] = None, ind """ Return the number of documents in the document store. """ + #TODO: Figuring out how to do this using weaviate python client pass def get_all_documents( @@ -402,6 +392,7 @@ def get_all_documents( :param return_embedding: Whether to return the document embeddings. :param batch_size: When working with large number of documents, batching can help reduce memory footprint. """ + index = index or self.index result = self.get_all_documents_generator( index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size ) @@ -418,33 +409,14 @@ def _get_all_documents_in_index( """ Return all documents in a specific index in the document store """ - where_filter = { - "operator": "And", - "operands": [] - } - + index = index or self.index if filters: - operands = [] - for key, values in filters.items(): - operands.append( - { - "path": [key], - "operator": "equal", - "valueString": [values] - } - ) - where_filter = { - "operator": "And", - "operands": [operands] - } - - if only_documents_without_embedding: - raise OSError() + raise OSError("Filters are not supported currently in WeaviateDocumentStore!") - result = self.weaviate_client.query.get(class_name=self.index, properties=[self.text_field,"_additional {id, certainty}"])\ + result = self.weaviate_client.query.get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ .with_limit(batch_size)\ .do() - yield from result.get("data").get("Get").get(self.index) + yield from result.get("data").get("Get").get(index) def get_all_documents_generator( self, @@ -487,7 +459,7 @@ def query( ) -> List[Document]: """ Scan through documents in DocumentStore and return a small number documents - that are most relevant to the query as defined by the BM25 algorithm. + that are most relevant to the query as defined by Weaviate semantic search. :param query: The query :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field @@ -533,7 +505,7 @@ def query_by_embedding(self, :param filters: Optional filters to narrow down the search space. Example: {"name": ["some", "more"], "category": ["only_one"]} :param top_k: How many documents to return - :param index: (SQL) index name for storing the docs and metadata + :param index: index name for storing the docs and metadata :param return_embedding: To return document embedding :return: """ @@ -591,10 +563,10 @@ def update_embeddings( def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): """ - Delete all documents (from SQL AND Milvus). - :param index: (SQL) index name for storing the docs and metadata - :param filters: Optional filters to narrow down the search space. - Example: {"name": ["some", "more"], "category": ["only_one"]} + Delete documents in an index. All documents are deleted if no filters are passed. + + :param index: Index name to delete the document from. + :param filters: Optional filters to narrow down the documents to be deleted. :return: None """ index = index or self.index diff --git a/test/conftest.py b/test/conftest.py index 9d5becdddb..dd2512f5a3 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,7 +8,6 @@ from elasticsearch import Elasticsearch from haystack.knowledge_graph.graphdb import GraphDBKnowledgeGraph from milvus import Milvus -import weaviate from haystack.document_store.milvus import MilvusDocumentStore from haystack.generator.transformers import RAGenerator, RAGeneratorType @@ -19,7 +18,6 @@ from haystack import Document from haystack.document_store.elasticsearch import ElasticsearchDocumentStore -from haystack.document_store.weaviate import WeaviateDocumentStore from haystack.document_store.faiss import FAISSDocumentStore from haystack.document_store.memory import InMemoryDocumentStore from haystack.document_store.sql import SQLDocumentStore @@ -101,37 +99,6 @@ def milvus_fixture(): time.sleep(40) -@pytest.fixture(scope="session") -def weaviate_fixture(): - # test if a Milvus server is already running. If not, start Milvus docker container locally. - # Make sure you have given > 6GB memory to docker engine - try: - weaviate_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) - weaviate_server.is_ready() - except: - print("Starting Weaviate servers ...") - status = subprocess.run( - ['docker rm haystack_test_weaviate'], - shell=True - ) - status = subprocess.run( - ['docker rm haystack_test_weaviate_transformers'], - shell=True - ) - status = subprocess.run( - ['docker run -d --name haystack_test_weaviate -p 8080:8080 semitechnologies/weaviate'], - shell=True - ) - status = subprocess.run( - ['docker run -d --name haystack_test_weaviate_transformers semitechnologies/transformers-inference'], - shell=True - ) - if status.returncode: - raise Exception( - "Failed to launch Weaviate. Please check docker container logs.") - time.sleep(60) - - @pytest.fixture(scope="session") def graphdb_fixture(): # test if a GraphDB instance is already running. If not, download and start a GraphDB instance locally. @@ -336,7 +303,7 @@ def get_retriever(retriever_type, document_store): return retriever -@pytest.fixture(params=["weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) @@ -344,12 +311,13 @@ def document_store_with_docs(request, test_docs_xs): document_store.delete_all_documents() -@pytest.fixture(params=["weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store document_store.delete_all_documents() + def get_document_store(document_store_type, embedding_field="embedding"): if document_store_type == "sql": document_store = SQLDocumentStore(url="sqlite://", index="haystack_test") @@ -384,14 +352,7 @@ def get_document_store(document_store_type, embedding_field="embedding"): if collection.startswith("haystack_test"): document_store.milvus_server.drop_collection(collection) return document_store - elif document_store_type == "weaviate": - document_store = WeaviateDocumentStore( - weaviate_url="http://localhost:8080", - index="Haystacktest" - ) - document_store.weaviate_client.schema.delete_class("Haystacktest") - return document_store else: raise Exception(f"No document store fixture for '{document_store_type}'") - return document_store + return document_store \ No newline at end of file From 1e69d2850bdafccac7e6fc52fa2d607a230aed5e Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sun, 16 May 2021 09:41:48 +0530 Subject: [PATCH 06/22] Updated comments --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index dd2512f5a3..42aac7e064 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -355,4 +355,4 @@ def get_document_store(document_store_type, embedding_field="embedding"): else: raise Exception(f"No document store fixture for '{document_store_type}'") - return document_store \ No newline at end of file + return document_store From 0c7977dfad2814b19819c62461fcb9c58aa1ac77 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sun, 16 May 2021 12:27:27 +0530 Subject: [PATCH 07/22] ran query, get and write tests --- haystack/document_store/weaviate.py | 60 +++++++++++++++++++---------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 1195c92dfa..2a6036f9ed 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -42,6 +42,7 @@ def __init__( text_field: str = "text", name_field: str = "name", faq_question_field = "question", + meta_field = "meta", similarity: str = "dot_product", index_type: str = "hnsw", custom_schema: Optional[dict] = None, @@ -64,6 +65,7 @@ def __init__( If no Reader is used (e.g. in FAQ-Style QA) the plain content of this field will just be returned. :param name_field: Name of field that contains the title of the the doc :param faq_question_field: Name of field containing the question in case of FAQ-Style QA + :param meta_field : Name of field to store all the meta data (key value pairs) in the Document object :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default. :param index_type: Index type of any vector object defined in weaviate schema. The vector index type is pluggable. Currently, HSNW is only supported. @@ -86,7 +88,7 @@ def __init__( self.set_config( weaviate_url=weaviate_url, timeout_config=timeout_config, username=username, password=password, index=index, vector_dim=vector_dim, text_field=text_field, name_field=name_field, - faq_question_field=faq_question_field, similarity=similarity, index_type=index_type, + faq_question_field=faq_question_field, meta_field=meta_field, similarity=similarity, index_type=index_type, custom_schema=custom_schema, module_name=module_name, update_existing_documents=update_existing_documents, return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) @@ -119,6 +121,7 @@ def __init__( self.text_field = text_field self.name_field = name_field self.faq_question_field = faq_question_field + self.meta_field = meta_field self.similarity = similarity self.index_type = index_type self.custom_schema = custom_schema @@ -208,7 +211,7 @@ def _create_schema_and_index_if_not_exist( "vectorizePropertyName": False } }, - "name": "meta" + "name": self.meta_field } ], "vectorIndexConfig": { @@ -250,30 +253,35 @@ def _convert_weaviate_result_to_document( text = props.get(self.text_field) question = props.get(self.faq_question_field) - # We put all additional data of the doc into meta_data and return it in the API - meta_data = {k:v for k,v in props.items() if k not in (self.text_field, self.faq_question_field, self.embedding_field)} - name = meta_data.pop(self.name_field, None) - if name: - meta_data["name"] = name - # Weaviate creates "_additional" key for semantic search - if result.get("_additional"): - score = result.get("_additional").get('certainty') if result.get("_additional").get('certainty') else None + if props.get("_additional"): + score = props.get("_additional").get('certainty') if props.get("_additional").get('certainty') else None if score: probability = score + id = props.get("_additional").get('id') if props.get("_additional").get('id') else None + props.pop("_additional", None) - id = result.get("_additional").get('id') if result.get("_additional").get('id') else None + # We put all additional data of the doc into meta_data and return it in the API + meta_data = {k:v for k,v in props.items() if k not in (self.text_field, self.faq_question_field, self.embedding_field)} + meta = meta_data.pop(self.meta_field, None) + if meta: + import ast + meta = ast.literal_eval(meta) + name = meta_data.pop(self.name_field, None) + if name: + meta[self.name_field] = name if return_embedding: - if not embedding: - embedding = result.get("_additional").get("vector") + #if not embedding: + # TODO: Not sure if _additional can have vector + #embedding = result.get("_additional").get("vector") if embedding: embedding = np.asarray(embedding, dtype=np.float32) document = Document( id=id, text=text, - meta=meta_data, + meta=meta, score=score, probability=probability, question=question, @@ -307,6 +315,7 @@ def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> Li """Fetch documents by specifying a list of text id strings""" index = index or self.index docs = [] + #TODO: better implementation with multiple where filters instead of chatty call below? for id in ids: docs.append(self.get_document_by_id(id)) return docs @@ -354,10 +363,15 @@ def write_documents( else: docs_batch.add(_doc, class_name=self.index, uuid=doc_id) - outputs = self.weaviate_client.batch.create(docs_batch) - for output in outputs: - if output.get('result').get('errors'): - print(output.get('result').get('errors')) + # Ingest a batch of documents + results = self.weaviate_client.batch.create(docs_batch) + # Weaviate returns errors for every failed document in the batch + if results is not None: + for result in results: + if 'result' in result and 'errors' in result['result'] \ + and 'error' in result['result']['errors']: + for message in result['result']['errors']['error']: + logger.error(f"{message['message']}") progress_bar.update(batch_size) progress_bar.close() @@ -410,10 +424,12 @@ def _get_all_documents_in_index( Return all documents in a specific index in the document store """ index = index or self.index + properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] + if filters: raise OSError("Filters are not supported currently in WeaviateDocumentStore!") - result = self.weaviate_client.query.get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ + result = self.weaviate_client.query.get(class_name=index, properties=properties)\ .with_limit(batch_size)\ .do() yield from result.get("data").get("Get").get(index) @@ -471,6 +487,7 @@ def query( logger.warning("Query filters are not implemented for the WeaviateDocumentStore.") index = index or self.index + properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] if custom_query: query_output = self.weaviate_client.query.raw(custom_query) @@ -479,7 +496,7 @@ def query( "concepts" : [query] } query_output = self.weaviate_client.query\ - .get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ + .get(class_name=index, properties=properties)\ .with_near_text(query_string)\ .with_limit(top_k)\ .do() @@ -515,13 +532,14 @@ def query_by_embedding(self, if return_embedding is None: return_embedding = self.return_embedding index = index or self.index + properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] query_emb = query_emb.reshape(1, -1).astype(np.float32) query_string = { "vector" : query_emb } query_output = self.weaviate_client.query\ - .get(class_name=index, properties=[self.text_field,"_additional {id, certainty}"])\ + .get(class_name=index, properties=properties)\ .with_near_vector(query_string)\ .with_limit(top_k)\ .do() From 3a62ac5e171a8b7352c0609192ef9b4aa4c67590 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sat, 5 Jun 2021 19:57:18 +0530 Subject: [PATCH 08/22] update embeddings, dynamic schema and filters implemented --- haystack/document_store/weaviate.py | 285 +++++++++++++++++----------- 1 file changed, 170 insertions(+), 115 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 2a6036f9ed..a74a3e7880 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -33,20 +33,19 @@ class WeaviateDocumentStore(BaseDocumentStore): def __init__( self, - weaviate_url: str = "http://localhost:8080", + host: Union[str, List[str]] = "http://localhost", + port: Union[int, List[int]] = 8080, timeout_config: tuple = (5, 15), username: str = None, password: str = None, index: str = "Document", - vector_dim: int = 768, + embedding_dim: int = 768, text_field: str = "text", name_field: str = "name", faq_question_field = "question", - meta_field = "meta", similarity: str = "dot_product", index_type: str = "hnsw", custom_schema: Optional[dict] = None, - module_name: str = "text2vec-transformers", update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", @@ -54,18 +53,18 @@ def __init__( **kwargs, ): """ - :param weaviate_url: Weaviate server connection URL for storing and processing documents and vectors. + :param host: Weaviate server connection URL for storing and processing documents and vectors. For more details, refer "https://www.semi.technology/developers/weaviate/current/getting-started/installation.html" + :param port: port of Weaviate instance :param timeout_config: Weaviate Timeout config as a tuple of (retries, time out seconds). :param username: username (standard authentication via http_auth) :param password: password (standard authentication via http_auth) :param index: Index name for document text, embedding and metadata (in Weaviate terminology, this is a "Class" in Weaviate schema). - :param vector_dim: The embedding vector size. Default: 768. + :param embedding_dim: The embedding vector size. Default: 768. :param text_field: Name of field that might contain the answer and will therefore be passed to the Reader Model (e.g. "full_text"). If no Reader is used (e.g. in FAQ-Style QA) the plain content of this field will just be returned. :param name_field: Name of field that contains the title of the the doc :param faq_question_field: Name of field containing the question in case of FAQ-Style QA - :param meta_field : Name of field to store all the meta data (key value pairs) in the Document object :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default. :param index_type: Index type of any vector object defined in weaviate schema. The vector index type is pluggable. Currently, HSNW is only supported. @@ -86,14 +85,15 @@ def __init__( # save init parameters to enable export of component config as YAML self.set_config( - weaviate_url=weaviate_url, timeout_config=timeout_config, username=username, password=password, - index=index, vector_dim=vector_dim, text_field=text_field, name_field=name_field, - faq_question_field=faq_question_field, meta_field=meta_field, similarity=similarity, index_type=index_type, - custom_schema=custom_schema, module_name=module_name, update_existing_documents=update_existing_documents, + host=host, port=port, timeout_config=timeout_config, username=username, password=password, + index=index, embedding_dim=embedding_dim, text_field=text_field, name_field=name_field, + faq_question_field=faq_question_field, similarity=similarity, index_type=index_type, + custom_schema=custom_schema, update_existing_documents=update_existing_documents, return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, ) # Connect to Weaviate server using python binding + weaviate_url =f"{host}:{port}" if username and password: secret = AuthClientPassword(username, password) self.weaviate_client = client.Client(url=weaviate_url, @@ -117,15 +117,13 @@ def __init__( f"at `{weaviate_url}` and that it has finished the initial ramp up (can take > 30s)." ) self.index = index - self.vector_dim = vector_dim + self.embedding_dim = embedding_dim self.text_field = text_field self.name_field = name_field self.faq_question_field = faq_question_field - self.meta_field = meta_field self.similarity = similarity self.index_type = index_type self.custom_schema = custom_schema - self.module_name = module_name self.update_existing_documents = update_existing_documents self.return_embedding = return_embedding self.embedding_field = embedding_field @@ -133,14 +131,11 @@ def __init__( self._create_schema_and_index_if_not_exist(self.index) - #def __del__(self): - #return self.milvus_server.close() - def _create_schema_and_index_if_not_exist( self, index: Optional[str] = None, ): - """Create a new index(schema/class in Weaviate)for storing documents in case if an index (schema) with the name doesn't exist already.""" + """Create a new index (schema/class in Weaviate) for storing documents in case if an index (schema) with the name doesn't exist already.""" index = index or self.index if self.custom_schema: @@ -150,28 +145,17 @@ def _create_schema_and_index_if_not_exist( "classes": [ { "class": index, - "description": "Haystack default class", + "description": "Haystack index, it's a class in Weaviate", "invertedIndexConfig": { "cleanupIntervalSeconds": 60 }, - "moduleConfig": { - "text2vec-transformers": { - "poolingStrategy": "masked_mean", - "vectorizeClassName": False - } - }, + "vectorizer": "none", "properties": [ { "dataType": [ "string" ], "description": "Name Field", - "moduleConfig": { - "text2vec-transformers": { - "skip": False, - "vectorizePropertyName": False - } - }, "name": self.name_field }, { @@ -179,12 +163,6 @@ def _create_schema_and_index_if_not_exist( "string" ], "description": "Question Field", - "moduleConfig": { - "text2vec-transformers": { - "skip": False, - "vectorizePropertyName": False - } - }, "name": self.faq_question_field }, { @@ -192,36 +170,9 @@ def _create_schema_and_index_if_not_exist( "text" ], "description": "Document Text", - "moduleConfig": { - "text2vec-transformers": { - "skip": True, - "vectorizePropertyName": True - } - }, "name": self.text_field }, - { - "dataType": [ - "string" - ], - "description": "Document meta", - "moduleConfig": { - "text2vec-transformers": { - "skip": False, - "vectorizePropertyName": False - } - }, - "name": self.meta_field - } ], - "vectorIndexConfig": { - "cleanupIntervalSeconds": 300, - "maxConnections": 64, - "efConstruction": 128, - "vectorCacheMaxObjects": 500000 - }, - "vectorIndexType": self.index_type, - "vectorizer": self.module_name } ] } @@ -244,8 +195,8 @@ def _convert_weaviate_result_to_document( id = result.get("id") embedding = result.get("vector") - # If properties key is present, the fields are in this key. - # otherwise, a direct lookup in result dict + # If properties key is present, get all the document fields from it. + # otherwise, a direct lookup in result root dict props = result.get("properties") if not props: props = result @@ -263,25 +214,14 @@ def _convert_weaviate_result_to_document( # We put all additional data of the doc into meta_data and return it in the API meta_data = {k:v for k,v in props.items() if k not in (self.text_field, self.faq_question_field, self.embedding_field)} - meta = meta_data.pop(self.meta_field, None) - if meta: - import ast - meta = ast.literal_eval(meta) - name = meta_data.pop(self.name_field, None) - if name: - meta[self.name_field] = name - - if return_embedding: - #if not embedding: - # TODO: Not sure if _additional can have vector - #embedding = result.get("_additional").get("vector") - if embedding: - embedding = np.asarray(embedding, dtype=np.float32) + + if return_embedding and embedding: + embedding = np.asarray(embedding, dtype=np.float32) document = Document( id=id, text=text, - meta=meta, + meta=meta_data, score=score, probability=probability, question=question, @@ -320,6 +260,50 @@ def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> Li docs.append(self.get_document_by_id(id)) return docs + def _get_current_properties(self, index: str) -> List[str]: + """Get all the existing properties in the schema""" + cur_properties = [] + for class_item in self.weaviate_client.schema.get()['classes']: + if class_item['class'] == index: + cur_properties = [item['name'] for item in class_item['properties']] + + return cur_properties + + def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: + """Transform Haystack filter conditions to Weaviate where filter clauses""" + weaviate_filters = [] + for key, value in filters.items(): + weaviate_filter = { + "path": [key], + "operator": "Equal", + "valueString": value + } + weaviate_filters.append(weaviate_filter) + if len(weaviate_filters) > 1: + filter_dict = { + "operator": "Or", + "operands": weaviate_filters + } + else: + filter_dict = weaviate_filters[0] + + return filter_dict + + def _update_schema(self, new_prop:str): + """Updates the schema with a new property""" + property_dict = { + "dataType": [ + "string" + ], + "description": f"dynamic property {new_prop}", + "name": new_prop + } + self.weaviate_client.schema.property.create(self.index, property_dict) + + def _check_document(self, cur_props: List[str], doc: dict) -> List[str]: + """Find the properties in the document that don't exist in the existing schema""" + return [item for item in doc.keys() if item not in cur_props] + def write_documents( self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 ): @@ -340,6 +324,10 @@ def write_documents( logger.warning("Calling DocumentStore.write_documents() with empty list") return + # Auto schema feature https://github.com/semi-technologies/weaviate/issues/1539 + # Get and cache current properties in the schema + current_properties = self._get_current_properties(index) + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] batched_documents = get_batches_from_generator(document_objects, batch_size) @@ -352,16 +340,28 @@ def write_documents( } _ = _doc.pop("score", None) _ = _doc.pop("probability", None) + + # In order to have a flat structure in elastic + similar behaviour to the other DocumentStores, + # we "unnest" all value within "meta" if "meta" in _doc.keys(): - _doc["meta"] = str(_doc.get("meta")) + for k, v in _doc["meta"].items(): + _doc[k] = v + _doc.pop("meta") + doc_id = str(_doc.pop("id")) vector = _doc.pop(self.embedding_field) if _doc.get(self.faq_question_field) is None: _doc.pop(self.faq_question_field) - if vector: - docs_batch.add(_doc, class_name=self.index, uuid=doc_id, vector=vector) - else: - docs_batch.add(_doc, class_name=self.index, uuid=doc_id) + + # Check if additional properties are in the document, if so, + # append the schema with all the additional properties + missing_props = self._check_document(current_properties, _doc) + if missing_props: + for property in missing_props: + self._update_schema(property) + current_properties.append(property) + + docs_batch.add(_doc, class_name=self.index, uuid=doc_id, vector=vector) # Ingest a batch of documents results = self.weaviate_client.batch.create(docs_batch) @@ -379,15 +379,22 @@ def update_document_meta(self, id: str, meta: Dict[str, str]): """ Update the metadata dictionary of a document by specifying its string id """ - body = {"meta": meta} - self.weaviate_client.data_object.update(body, class_name=self.index, uuid=id) + self.weaviate_client.data_object.update(meta, class_name=self.index, uuid=id) def get_document_count(self, filters: Optional[Dict[str, List[str]]] = None, index: Optional[str] = None) -> int: """ Return the number of documents in the document store. """ - #TODO: Figuring out how to do this using weaviate python client - pass + doc_count = 0 + result = self.weaviate_client.query.aggregate(index)\ + .with_fields("meta { count }")\ + .do() + + if "data" in result: + if "Aggregate" in result.get('data'): + doc_count = result.get('data').get('Aggregate').get(index)[0]['meta']['count'] + + return doc_count def get_all_documents( self, @@ -424,14 +431,19 @@ def _get_all_documents_in_index( Return all documents in a specific index in the document store """ index = index or self.index - properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] - if filters: - raise OSError("Filters are not supported currently in WeaviateDocumentStore!") + # Build the properties to retrieve from Weaviate + properties = self._get_current_properties(index) + properties.append("_additional {id, certainty}") - result = self.weaviate_client.query.get(class_name=index, properties=properties)\ - .with_limit(batch_size)\ - .do() + if filters: + filter_dict = self._build_filter_clause(filters=filters) + result = self.weaviate_client.query.get(class_name=index, properties=properties)\ + .with_where(filter_dict)\ + .do() + else: + result = self.weaviate_client.query.get(class_name=index, properties=properties)\ + .do() yield from result.get("data").get("Get").get(index) def get_all_documents_generator( @@ -483,18 +495,27 @@ def query( :param index: The name of the index in the DocumentStore from which to retrieve documents """ - if filters: - logger.warning("Query filters are not implemented for the WeaviateDocumentStore.") - index = index or self.index - properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] + + # Build the properties to retrieve from Weaviate + properties = self._get_current_properties(index) + properties.append("_additional {id, certainty}") + + query_string = { + "concepts": [query] + } if custom_query: query_output = self.weaviate_client.query.raw(custom_query) + elif filters: + filter_dict = self._build_filter_clause(filters) + query_output = self.weaviate_client.query\ + .get(class_name=index, properties=properties)\ + .with_where(filter_dict)\ + .with_near_text(query_string)\ + .with_limit(top_k)\ + .do() else: - query_string = { - "concepts" : [query] - } query_output = self.weaviate_client.query\ .get(class_name=index, properties=properties)\ .with_near_text(query_string)\ @@ -526,23 +547,32 @@ def query_by_embedding(self, :param return_embedding: To return document embedding :return: """ - if filters: - logger.warning("Query filters are not implemented for the WeaviateDocumentStore.") - if return_embedding is None: return_embedding = self.return_embedding index = index or self.index - properties = [self.text_field, self.faq_question_field, self.name_field, self.meta_field, "_additional {id, certainty}"] + + # Build the properties to retrieve from Weaviate + properties = self._get_current_properties(index) + properties.append("_additional {id, certainty}") query_emb = query_emb.reshape(1, -1).astype(np.float32) query_string = { "vector" : query_emb } - query_output = self.weaviate_client.query\ - .get(class_name=index, properties=properties)\ - .with_near_vector(query_string)\ - .with_limit(top_k)\ - .do() + if filters: + filter_dict = self._build_filter_clause(filters) + query_output = self.weaviate_client.query\ + .get(class_name=index, properties=properties)\ + .with_where(filter_dict)\ + .with_near_vector(query_string)\ + .with_limit(top_k)\ + .do() + else: + query_output = self.weaviate_client.query\ + .get(class_name=index, properties=properties)\ + .with_near_vector(query_string)\ + .with_limit(top_k)\ + .do() results = query_output.get("data").get("Get").get(self.index) documents = [] @@ -575,9 +605,35 @@ def update_embeddings( :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :return: None """ - raise RuntimeError("Weaviate store produces embeddings by default based on the configuration in " - "schema. Update embeddings isn't implemented for this store!") + if index is None: + index = self.index + if not self.embedding_field: + raise RuntimeError("Specify the arg `embedding_field` when initializing ElasticsearchDocumentStore()") + + if update_existing_embeddings: + logger.info(f"Updating embeddings for all {self.get_document_count(index=index)} docs ...") + else: + logger.info(f"Updating embeddings for new docs without embeddings ...") + + result = self._get_all_documents_in_index( + index=index, + filters=filters, + batch_size=batch_size, + only_documents_without_embedding=not update_existing_embeddings + ) + + for result_batch in get_batches_from_generator(result, batch_size): + document_batch = [self._convert_weaviate_result_to_document(hit, return_embedding=False) for hit in result_batch] + embeddings = retriever.embed_passages(document_batch) # type: ignore + assert len(document_batch) == len(embeddings) + + if embeddings[0].shape[0] != self.embedding_dim: + raise RuntimeError(f"Embedding dim. of model ({embeddings[0].shape[0]})" + f" doesn't match embedding dim. in DocumentStore ({self.embedding_dim})." + "Specify the arg `embedding_dim` when initializing WeaviateDocumentStore()") + for doc, emb in zip(document_batch, embeddings): + self.weaviate_client.data_object.update({}, index, doc.id, emb.tolist()) def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): """ @@ -589,4 +645,3 @@ def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Di """ index = index or self.index self.weaviate_client.schema.delete_class(index) - From 4a67f163ab346d4d824dd8222ee3c67d074fdc74 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sat, 5 Jun 2021 23:37:46 +0530 Subject: [PATCH 09/22] Initial set of tests and fixes --- haystack/document_store/weaviate.py | 51 ++++--- test/conftest.py | 34 ++++- test/test_weaviate.py | 207 ++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+), 20 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index a74a3e7880..64cfc66a04 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -272,13 +272,14 @@ def _get_current_properties(self, index: str) -> List[str]: def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: """Transform Haystack filter conditions to Weaviate where filter clauses""" weaviate_filters = [] - for key, value in filters.items(): - weaviate_filter = { - "path": [key], - "operator": "Equal", - "valueString": value - } - weaviate_filters.append(weaviate_filter) + for key, values in filters.items(): + for value in values: + weaviate_filter = { + "path": [key], + "operator": "Equal", + "valueString": value + } + weaviate_filters.append(weaviate_filter) if len(weaviate_filters) > 1: filter_dict = { "operator": "Or", @@ -289,7 +290,7 @@ def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: return filter_dict - def _update_schema(self, new_prop:str): + def _update_schema(self, new_prop:str, index: str): """Updates the schema with a new property""" property_dict = { "dataType": [ @@ -298,7 +299,7 @@ def _update_schema(self, new_prop:str): "description": f"dynamic property {new_prop}", "name": new_prop } - self.weaviate_client.schema.property.create(self.index, property_dict) + self.weaviate_client.schema.property.create(index, property_dict) def _check_document(self, cur_props: List[str], doc: dict) -> List[str]: """Find the properties in the document that don't exist in the existing schema""" @@ -310,8 +311,8 @@ def write_documents( """ Add new documents to the DocumentStore. - :param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index - them right away in Weaviate. If not, the embeddings are automatically generated by Weaviate. + :param documents: List of `Dicts` or List of `Documents`. Passing an Embedding/Vector is mandatory in case weaviate is not + configured with a module. If a module is configured, the embedding is automatically generated by Weaviate. :param index: index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :return: @@ -358,10 +359,10 @@ def write_documents( missing_props = self._check_document(current_properties, _doc) if missing_props: for property in missing_props: - self._update_schema(property) + self._update_schema(property, index) current_properties.append(property) - docs_batch.add(_doc, class_name=self.index, uuid=doc_id, vector=vector) + docs_batch.add(_doc, class_name=index, uuid=doc_id, vector=vector) # Ingest a batch of documents results = self.weaviate_client.batch.create(docs_batch) @@ -385,10 +386,18 @@ def get_document_count(self, filters: Optional[Dict[str, List[str]]] = None, ind """ Return the number of documents in the document store. """ + index = index or self.index doc_count = 0 - result = self.weaviate_client.query.aggregate(index)\ - .with_fields("meta { count }")\ + if filters: + filter_dict = self._build_filter_clause(filters=filters) + result = self.weaviate_client.query.aggregate(index) \ + .with_fields("meta { count }") \ + .with_where(filter_dict)\ .do() + else: + result = self.weaviate_client.query.aggregate(index)\ + .with_fields("meta { count }")\ + .do() if "data" in result: if "Aggregate" in result.get('data'): @@ -444,7 +453,13 @@ def _get_all_documents_in_index( else: result = self.weaviate_client.query.get(class_name=index, properties=properties)\ .do() - yield from result.get("data").get("Get").get(index) + + all_docs = {} + if result and "data" in result and "Get" in result.get("data"): + if result.get("data").get("Get").get(index): + all_docs = result.get("data").get("Get").get(index) + + return all_docs def get_all_documents_generator( self, @@ -522,7 +537,7 @@ def query( .with_limit(top_k)\ .do() - results = query_output.get("data").get("Get").get(self.index) + results = query_output.get("data").get("Get").get(index) documents = [] for result in results: doc = self._convert_weaviate_result_to_document(result, return_embedding=True) @@ -574,7 +589,7 @@ def query_by_embedding(self, .with_limit(top_k)\ .do() - results = query_output.get("data").get("Get").get(self.index) + results = query_output.get("data").get("Get").get(index) documents = [] for result in results: doc = self._convert_weaviate_result_to_document(result, return_embedding=True) diff --git a/test/conftest.py b/test/conftest.py index 42aac7e064..0c3fa1fb3d 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -9,6 +9,9 @@ from haystack.knowledge_graph.graphdb import GraphDBKnowledgeGraph from milvus import Milvus +import weaviate +from haystack.document_store.weaviate import WeaviateDocumentStore + from haystack.document_store.milvus import MilvusDocumentStore from haystack.generator.transformers import RAGenerator, RAGeneratorType @@ -98,6 +101,27 @@ def milvus_fixture(): 'milvusdb/milvus:0.10.5-cpu-d010621-4eda95'], shell=True) time.sleep(40) +@pytest.fixture(scope="session") +def weaviate_fixture(): + # test if a Weaviate server is already running. If not, start Weaviate docker container locally. + # Make sure you have given > 6GB memory to docker engine + try: + weaviate_server = weaviate.Client(url='http://localhost:8080', timeout_config=(5, 15)) + weaviate_server.is_ready() + except: + print("Starting Weaviate servers ...") + status = subprocess.run( + ['docker rm haystack_test_weaviate'], + shell=True + ) + status = subprocess.run( + ['docker run -d --name haystack_test_weaviate -p 8080:8080 semitechnologies/weaviate:1.3.0'], + shell=True + ) + if status.returncode: + raise Exception( + "Failed to launch Weaviate. Please check docker container logs.") + time.sleep(60) @pytest.fixture(scope="session") def graphdb_fixture(): @@ -303,7 +327,7 @@ def get_retriever(retriever_type, document_store): return retriever -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) @@ -311,7 +335,7 @@ def document_store_with_docs(request, test_docs_xs): document_store.delete_all_documents() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store @@ -352,6 +376,12 @@ def get_document_store(document_store_type, embedding_field="embedding"): if collection.startswith("haystack_test"): document_store.milvus_server.drop_collection(collection) return document_store + elif document_store_type == "weaviate": + document_store = WeaviateDocumentStore( + weaviate_url="http://localhost:8080", + index="Haystacktest" + ) + return document_store else: raise Exception(f"No document store fixture for '{document_store_type}'") diff --git a/test/test_weaviate.py b/test/test_weaviate.py index e69de29bb2..16b5738818 100644 --- a/test/test_weaviate.py +++ b/test/test_weaviate.py @@ -0,0 +1,207 @@ +import numpy as np +import pytest +from haystack import Document +from conftest import get_document_store +import uuid + +#1 Property names can't have _ or numbers +#2 Mandatory to pass a vector to create documents +#3 Only string properties are supported for now +#4 Only simple string filters + +embedding_dim = 768 + +DOCUMENTS = [ + {"text": "text1", "key": "a", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text2", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text3", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text4", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text5", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, +] + +DOCUMENTS_XS = [ + # current "dict" format for a document + {"text": "My name is Carla and I live in Berlin", "meta": {"metafield": "test1", "name": "filename1"}, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + # meta_field at the top level for backward compatibility + {"text": "My name is Paul and I live in New York", "metafield": "test2", "name": "filename2", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + # Document object for a doc + Document(text="My name is Christelle and I live in Paris", meta={"metafield": "test3", "name": "filename3"}, embedding=np.random.rand(embedding_dim).astype(np.float32)) + ] + +''' + +document_store = WeaviateDocumentStore( + weaviate_url="http://localhost:8080", + index="Haystacktest" +) +document_store.write_documents(documents) +#assert len(list(document_store.get_all_documents(batch_size=2))) == 5 +#print(document_store.get_document_by_id("548dd341-b348-4fd9-976c-63e29c450cda")) +#print(document_store.query("text")) +print(document_store.query_by_embedding(np.random.rand(768).astype(np.float32), top_k=1)) +#document_store.delete_all_documents()''' + +@pytest.fixture(params=["weaviate"]) +def document_store_with_docs(request): + document_store = get_document_store(request.param) + document_store.write_documents(DOCUMENTS_XS) + yield document_store + document_store.delete_all_documents() + +@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +def test_get_all_documents_without_filters(document_store_with_docs): + documents = document_store_with_docs.get_all_documents() + assert all(isinstance(d, Document) for d in documents) + assert len(documents) == 3 + assert {d.meta["name"] for d in documents} == {"filename1", "filename2", "filename3"} + assert {d.meta["metafield"] for d in documents} == {"test1", "test2", "test3"} + +def test_get_all_documents_with_correct_filters(document_store_with_docs): + documents = document_store_with_docs.get_all_documents(filters={"metafield": ["test2"]}) + assert len(documents) == 1 + assert documents[0].meta["name"] == "filename2" + + documents = document_store_with_docs.get_all_documents(filters={"metafield": ["test1", "test3"]}) + assert len(documents) == 2 + assert {d.meta["name"] for d in documents} == {"filename1", "filename3"} + assert {d.meta["metafield"] for d in documents} == {"test1", "test3"} + +def test_get_all_documents_with_incorrect_filter_name(document_store_with_docs): + documents = document_store_with_docs.get_all_documents(filters={"incorrectmetafield": ["test2"]}) + assert len(documents) == 0 + + +def test_get_all_documents_with_incorrect_filter_value(document_store_with_docs): + documents = document_store_with_docs.get_all_documents(filters={"metafield": ["incorrect_value"]}) + assert len(documents) == 0 + +def test_get_documents_by_id(document_store_with_docs): + documents = document_store_with_docs.get_all_documents() + doc = document_store_with_docs.get_document_by_id(documents[0].id) + assert doc.id == documents[0].id + assert doc.text == documents[0].text + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_get_document_count(document_store): + document_store.write_documents(DOCUMENTS) + assert document_store.get_document_count() == 5 + assert document_store.get_document_count(filters={"key": ["a"]}) == 1 + assert document_store.get_document_count(filters={"key": ["b"]}) == 4 + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +@pytest.mark.parametrize("batch_size", [2]) +def test_weaviate_write_docs(document_store, batch_size): + # Write in small batches + for i in range(0, len(DOCUMENTS), batch_size): + document_store.write_documents(DOCUMENTS[i: i + batch_size]) + + documents_indexed = document_store.get_all_documents() + assert len(documents_indexed) == len(DOCUMENTS) + + documents_indexed = document_store.get_all_documents(batch_size=batch_size) + assert len(documents_indexed) == len(DOCUMENTS) + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_get_all_document_filter_duplicate_value(document_store): + documents = [ + Document( + text="Doc1", + meta={"fone": "f0"}, + embedding= np.random.rand(embedding_dim).astype(np.float32) + ), + Document( + text="Doc1", + meta={"fone": "f1", "metaid": "0"}, + embedding = np.random.rand(embedding_dim).astype(np.float32) + ), + Document( + text="Doc2", + meta={"fthree": "f0"}, + embedding=np.random.rand(embedding_dim).astype(np.float32) + ) + ] + document_store.write_documents(documents) + documents = document_store.get_all_documents(filters={"fone": ["f1"]}) + assert documents[0].text == "Doc1" + assert len(documents) == 1 + assert {d.meta["metaid"] for d in documents} == {"0"} + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_get_all_documents_generator(document_store): + document_store.write_documents(DOCUMENTS) + assert len(list(document_store.get_all_documents_generator(batch_size=2))) == 5 + + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +@pytest.mark.parametrize("update_existing_documents", [True, False]) +def test_update_existing_documents(document_store, update_existing_documents): + id = uuid.uuid4() + original_docs = [ + {"text": "text1_orig", "id": id, "metafieldforcount": "a", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + ] + + updated_docs = [ + {"text": "text1_new", "id": id, "metafieldforcount": "a", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + ] + + document_store.update_existing_documents = update_existing_documents + document_store.write_documents(original_docs) + assert document_store.get_document_count() == 1 + + if update_existing_documents: + document_store.write_documents(updated_docs) + else: + with pytest.raises(Exception): + document_store.write_documents(updated_docs) + + stored_docs = document_store.get_all_documents() + assert len(stored_docs) == 1 + if update_existing_documents: + assert stored_docs[0].text == updated_docs[0]["text"] + else: + assert stored_docs[0].text == original_docs[0]["text"] + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_write_document_meta(document_store): + uid1 = str(uuid.uuid4()) + uid2 = str(uuid.uuid4()) + uid3 = str(uuid.uuid4()) + uid4 = str(uuid.uuid4()) + documents = [ + {"text": "dict_without_meta", "id": uid1, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "dict_with_meta", "metafield": "test2", "name": "filename2", "id": uid2, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + Document(text="document_object_without_meta", id=uid3, embedding=np.random.rand(embedding_dim).astype(np.float32)), + Document(text="document_object_with_meta", meta={"metafield": "test4", "name": "filename3"}, id=uid4, embedding=np.random.rand(embedding_dim).astype(np.float32)), + ] + document_store.write_documents(documents) + documents_in_store = document_store.get_all_documents() + assert len(documents_in_store) == 4 + + assert not document_store.get_document_by_id(uid1).meta + assert document_store.get_document_by_id(uid2).meta["metafield"] == "test2" + assert not document_store.get_document_by_id(uid3).meta + assert document_store.get_document_by_id(uid4).meta["metafield"] == "test4" + + +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_write_document_index(document_store): + documents = [ + {"text": "text1", "id": uuid.uuid4(), "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text2", "id": uuid.uuid4(), "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + ] + + document_store.write_documents([documents[0]], index="Hayone") + assert len(document_store.get_all_documents(index="Hayone")) == 1 + + document_store.write_documents([documents[1]], index="Haytwo") + assert len(document_store.get_all_documents(index="Haytwo")) == 1 + + assert len(document_store.get_all_documents(index="Hayone")) == 1 + assert len(document_store.get_all_documents()) == 0 + + document_store.delete_all_documents("Hayone") + document_store.delete_all_documents("Haytwo") + + + + From 71b9bb4fd390096d7621a853cb559ea64c82e262 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Sun, 6 Jun 2021 16:32:45 +0530 Subject: [PATCH 10/22] Tests added for update_embeddings and delete documents --- haystack/document_store/weaviate.py | 56 ++++++++++----- test/conftest.py | 2 + test/test_weaviate.py | 101 +++++++++++++++++++++------- 3 files changed, 117 insertions(+), 42 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 64cfc66a04..db188959c0 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -210,6 +210,7 @@ def _convert_weaviate_result_to_document( if score: probability = score id = props.get("_additional").get('id') if props.get("_additional").get('id') else None + embedding = props.get("_additional").get('vector') if props.get("_additional").get('vector') else None props.pop("_additional", None) # We put all additional data of the doc into meta_data and return it in the API @@ -254,11 +255,9 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: """Fetch documents by specifying a list of text id strings""" index = index or self.index - docs = [] #TODO: better implementation with multiple where filters instead of chatty call below? - for id in ids: - docs.append(self.get_document_by_id(id)) - return docs + documents = [self.get_document_by_id(id) for id in ids] + return documents def _get_current_properties(self, index: str) -> List[str]: """Get all the existing properties in the schema""" @@ -443,7 +442,7 @@ def _get_all_documents_in_index( # Build the properties to retrieve from Weaviate properties = self._get_current_properties(index) - properties.append("_additional {id, certainty}") + properties.append("_additional {id, certainty, vector}") if filters: filter_dict = self._build_filter_clause(filters=filters) @@ -514,7 +513,7 @@ def query( # Build the properties to retrieve from Weaviate properties = self._get_current_properties(index) - properties.append("_additional {id, certainty}") + properties.append("_additional {id, certainty, vector}") query_string = { "concepts": [query] @@ -568,7 +567,7 @@ def query_by_embedding(self, # Build the properties to retrieve from Weaviate properties = self._get_current_properties(index) - properties.append("_additional {id, certainty}") + properties.append("_additional {id, certainty, vector}") query_emb = query_emb.reshape(1, -1).astype(np.float32) query_string = { @@ -607,14 +606,12 @@ def update_embeddings( ): """ Updates the embeddings in the the document store using the encoding model specified in the retriever. - This can be useful if want to add or change the embeddings for your documents (e.g. after changing the retriever config). + This can be useful if want to change the embeddings for your documents (e.g. after changing the retriever config). :param retriever: Retriever to use to update the embeddings. :param index: Index name to update - :param update_existing_embeddings: Whether to update existing embeddings of the documents. If set to False, - only documents without embeddings are processed. This mode can be used for - incremental updating of embeddings, wherein, only newly indexed documents - get processed. + :param update_existing_embeddings: Weaviate mandates an embedding while creating the document itself. + This option must be always true for weaviate and it will update the embeddings for all the documents. :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. Example: {"name": ["some", "more"], "category": ["only_one"]} :param batch_size: When working with large number of documents, batching can help reduce memory footprint. @@ -624,18 +621,17 @@ def update_embeddings( index = self.index if not self.embedding_field: - raise RuntimeError("Specify the arg `embedding_field` when initializing ElasticsearchDocumentStore()") + raise RuntimeError("Specify the arg `embedding_field` when initializing WeaviateDocumentStore()") if update_existing_embeddings: logger.info(f"Updating embeddings for all {self.get_document_count(index=index)} docs ...") else: - logger.info(f"Updating embeddings for new docs without embeddings ...") + raise RuntimeError("All the documents in Weaviate store have an embedding by default. Only update is allowed!") result = self._get_all_documents_in_index( index=index, filters=filters, batch_size=batch_size, - only_documents_without_embedding=not update_existing_embeddings ) for result_batch in get_batches_from_generator(result, batch_size): @@ -648,7 +644,27 @@ def update_embeddings( f" doesn't match embedding dim. in DocumentStore ({self.embedding_dim})." "Specify the arg `embedding_dim` when initializing WeaviateDocumentStore()") for doc, emb in zip(document_batch, embeddings): - self.weaviate_client.data_object.update({}, index, doc.id, emb.tolist()) + # This doc processing will not required once weaviate's update + # method works. To be improved. + _doc = { + **doc.to_dict(field_map=self._create_document_field_map()) + } + _ = _doc.pop("score", None) + _ = _doc.pop("probability", None) + + if "meta" in _doc.keys(): + for k, v in _doc["meta"].items(): + _doc[k] = v + _doc.pop("meta") + + doc_id = str(_doc.pop("id")) + _ = _doc.pop(self.embedding_field) + keys_to_remove = [k for k,v in _doc.items() if v is None] + for key in keys_to_remove: + _doc.pop(key) + + # TODO: Weaviate's update throws an error while passing a vector now, have to improve this later + self.weaviate_client.data_object.replace(_doc, class_name=index, uuid=doc_id, vector=emb) def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): """ @@ -659,4 +675,10 @@ def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Di :return: None """ index = index or self.index - self.weaviate_client.schema.delete_class(index) + if filters: + docs_to_delete = self.get_all_documents(index, filters=filters) + for doc in docs_to_delete: + self.weaviate_client.data_object.delete(doc.id) + else: + self.weaviate_client.schema.delete_class(index) + self._create_schema_and_index_if_not_exist(index) diff --git a/test/conftest.py b/test/conftest.py index 0c3fa1fb3d..4fdc278a9e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -381,6 +381,8 @@ def get_document_store(document_store_type, embedding_field="embedding"): weaviate_url="http://localhost:8080", index="Haystacktest" ) + document_store.weaviate_client.schema.delete_all() + document_store._create_schema_and_index_if_not_exist() return document_store else: raise Exception(f"No document store fixture for '{document_store_type}'") diff --git a/test/test_weaviate.py b/test/test_weaviate.py index 16b5738818..717615be45 100644 --- a/test/test_weaviate.py +++ b/test/test_weaviate.py @@ -4,11 +4,6 @@ from conftest import get_document_store import uuid -#1 Property names can't have _ or numbers -#2 Mandatory to pass a vector to create documents -#3 Only string properties are supported for now -#4 Only simple string filters - embedding_dim = 768 DOCUMENTS = [ @@ -28,19 +23,6 @@ Document(text="My name is Christelle and I live in Paris", meta={"metafield": "test3", "name": "filename3"}, embedding=np.random.rand(embedding_dim).astype(np.float32)) ] -''' - -document_store = WeaviateDocumentStore( - weaviate_url="http://localhost:8080", - index="Haystacktest" -) -document_store.write_documents(documents) -#assert len(list(document_store.get_all_documents(batch_size=2))) == 5 -#print(document_store.get_document_by_id("548dd341-b348-4fd9-976c-63e29c450cda")) -#print(document_store.query("text")) -print(document_store.query_by_embedding(np.random.rand(768).astype(np.float32), top_k=1)) -#document_store.delete_all_documents()''' - @pytest.fixture(params=["weaviate"]) def document_store_with_docs(request): document_store = get_document_store(request.param) @@ -190,18 +172,87 @@ def test_write_document_index(document_store): {"text": "text2", "id": uuid.uuid4(), "embedding": np.random.rand(embedding_dim).astype(np.float32)}, ] - document_store.write_documents([documents[0]], index="Hayone") - assert len(document_store.get_all_documents(index="Hayone")) == 1 + document_store.write_documents([documents[0]], index="Haystackone") + assert len(document_store.get_all_documents(index="Haystackone")) == 1 - document_store.write_documents([documents[1]], index="Haytwo") - assert len(document_store.get_all_documents(index="Haytwo")) == 1 + document_store.write_documents([documents[1]], index="Haystacktwo") + assert len(document_store.get_all_documents(index="Haystacktwo")) == 1 - assert len(document_store.get_all_documents(index="Hayone")) == 1 + assert len(document_store.get_all_documents(index="Haystackone")) == 1 assert len(document_store.get_all_documents()) == 0 - document_store.delete_all_documents("Hayone") - document_store.delete_all_documents("Haytwo") +'''@pytest.mark.parametrize("retriever", ["dpr", "embedding"], indirect=True) +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_update_embeddings(document_store, retriever): + documents = [] + for i in range(6): + documents.append({"text": f"text_{i}", "id": str(uuid.uuid4()), "metafield": f"value_{i}", "embedding": np.random.rand(embedding_dim).astype(np.float32)}) + documents.append({"text": "text_0", "id": str(uuid.uuid4()), "metafield": "value_0", "embedding": np.random.rand(embedding_dim).astype(np.float32)}) + + document_store.write_documents(documents, index="HaystackTestOne") + document_store.update_embeddings(retriever, index="HaystackTestOne", batch_size=3) + documents = document_store.get_all_documents(index="HaystackTestOne", return_embedding=True) + assert len(documents) == 7 + for doc in documents: + assert type(doc.embedding) is np.ndarray + + documents = document_store.get_all_documents( + index="HaystackTestOne", + filters={"metafield": ["value_0"]}, + return_embedding=True, + ) + assert len(documents) == 2 + for doc in documents: + assert doc.meta["metafield"] == "value_0" + np.testing.assert_array_almost_equal(documents[0].embedding, documents[1].embedding, decimal=4) + + documents = document_store.get_all_documents( + index="HaystackTestOne", + filters={"metafield": ["value_1", "value_5"]}, + return_embedding=True, + ) + np.testing.assert_raises( + AssertionError, + np.testing.assert_array_equal, + documents[0].embedding, + documents[1].embedding + ) + + doc = {"text": "text_7", "id": str(uuid.uuid4()), "metafield": "value_7", + "embedding": retriever.embed_queries(texts=["a random string"])[0]} + document_store.write_documents([doc], index="HaystackTestOne") + + doc_before_update = document_store.get_all_documents(index="HaystackTestOne", filters={"metafield": ["value_7"]})[0] + embedding_before_update = doc_before_update.embedding + + document_store.update_embeddings( + retriever, index="HaystackTestOne", batch_size=3, filters={"metafield": ["value_0", "value_1"]} + ) + doc_after_update = document_store.get_all_documents(index="HaystackTestOne", filters={"metafield": ["value_7"]})[0] + embedding_after_update = doc_after_update.embedding + np.testing.assert_array_equal(embedding_before_update, embedding_after_update) + + # test update all embeddings + document_store.update_embeddings(retriever, index="HaystackTestOne", batch_size=3, update_existing_embeddings=True) + assert document_store.get_document_count(index="HaystackTestOne") == 8 + doc_after_update = document_store.get_all_documents(index="HaystackTestOne", filters={"metafield": ["value_7"]})[0] + embedding_after_update = doc_after_update.embedding + np.testing.assert_raises(AssertionError, np.testing.assert_array_equal, embedding_before_update, embedding_after_update) +''' +@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +def test_delete_all_documents(document_store_with_docs): + assert len(document_store_with_docs.get_all_documents()) == 3 + document_store_with_docs.delete_all_documents() + documents = document_store_with_docs.get_all_documents() + assert len(documents) == 0 +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +def test_delete_documents_with_filters(document_store_with_docs): + document_store_with_docs.delete_all_documents(filters={"metafield": ["test1", "test2"]}) + documents = document_store_with_docs.get_all_documents() + assert len(documents) == 1 + assert documents[0].meta["metafield"] == "test3" From 0bfe557e378745ecf9fa547df326a49ee645a182 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 11:03:20 +0530 Subject: [PATCH 11/22] introduced duplicate documents fix --- haystack/document_store/weaviate.py | 89 ++++++++++++++++++++--------- test/test_weaviate.py | 83 +++++++++++++++++++++------ 2 files changed, 127 insertions(+), 45 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index db188959c0..7b121e5c6b 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -46,10 +46,10 @@ def __init__( similarity: str = "dot_product", index_type: str = "hnsw", custom_schema: Optional[dict] = None, - update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, + duplicate_documents: str = 'overwrite', **kwargs, ): """ @@ -73,14 +73,15 @@ def __init__( See https://www.semi.technology/developers/weaviate/current/data-schema/schema-configuration.html :param module_name : Vectorization module to convert data into vectors. Default is "text2vec-trasnformers" For more details, See https://www.semi.technology/developers/weaviate/current/modules/ - :param update_existing_documents: Whether to update any existing documents with the same ID when adding - documents. When set as True, any document with an existing ID gets updated. - If set to False, an error is raised if the document ID of the document being - added already exists. :param return_embedding: To return document embedding. :param embedding_field: Name of field containing an embedding vector. :param progress_bar: Whether to show a tqdm progress bar or not. Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents:Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already exists. """ # save init parameters to enable export of component config as YAML @@ -88,8 +89,8 @@ def __init__( host=host, port=port, timeout_config=timeout_config, username=username, password=password, index=index, embedding_dim=embedding_dim, text_field=text_field, name_field=name_field, faq_question_field=faq_question_field, similarity=similarity, index_type=index_type, - custom_schema=custom_schema, update_existing_documents=update_existing_documents, - return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar, + custom_schema=custom_schema,return_embedding=return_embedding, embedding_field=embedding_field, + progress_bar=progress_bar, duplicate_documents=duplicate_documents ) # Connect to Weaviate server using python binding @@ -124,10 +125,10 @@ def __init__( self.similarity = similarity self.index_type = index_type self.custom_schema = custom_schema - self.update_existing_documents = update_existing_documents self.return_embedding = return_embedding self.embedding_field = embedding_field self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents self._create_schema_and_index_if_not_exist(self.index) @@ -191,26 +192,31 @@ def _convert_weaviate_result_to_document( """ score = None probability = None + text = None + question = None id = result.get("id") embedding = result.get("vector") # If properties key is present, get all the document fields from it. # otherwise, a direct lookup in result root dict - props = result.get("properties") + props:dict = result.get("properties") if not props: props = result - text = props.get(self.text_field) - question = props.get(self.faq_question_field) + if props.get(self.text_field) is not None: + text = str(props.get(self.text_field)) + + if props.get(self.faq_question_field) is not None: + question = props.get(self.faq_question_field) # Weaviate creates "_additional" key for semantic search if props.get("_additional"): - score = props.get("_additional").get('certainty') if props.get("_additional").get('certainty') else None + score = props.get("_additional").get('certainty') if "certainty" in props.get("_additional") else None if score: probability = score - id = props.get("_additional").get('id') if props.get("_additional").get('id') else None - embedding = props.get("_additional").get('vector') if props.get("_additional").get('vector') else None + id = props.get("_additional").get('id') if "id" in props.get("_additional") else None + embedding = props.get("_additional").get('vector') if "vector" in props.get("_additional") else None props.pop("_additional", None) # We put all additional data of the doc into meta_data and return it in the API @@ -249,18 +255,25 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D 'vector': []}''' index = index or self.index result = self.weaviate_client.data_object.get_by_id(id, with_vector=True) - document = self._convert_weaviate_result_to_document(result, return_embedding=True) - return document + if result: + return self._convert_weaviate_result_to_document(result, return_embedding=True) - def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: + def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, + batch_size: int = 10_000) -> List[Document]: """Fetch documents by specifying a list of text id strings""" index = index or self.index + documents = [] #TODO: better implementation with multiple where filters instead of chatty call below? - documents = [self.get_document_by_id(id) for id in ids] + for id in ids: + result = self.weaviate_client.data_object.get_by_id(id, with_vector=True) + if result: + document = self._convert_weaviate_result_to_document(result, return_embedding=True) + documents.append(document) return documents - def _get_current_properties(self, index: str) -> List[str]: + def _get_current_properties(self, index: Optional[str] = None) -> List[str]: """Get all the existing properties in the schema""" + index = index or self.index cur_properties = [] for class_item in self.weaviate_client.schema.get()['classes']: if class_item['class'] == index: @@ -289,8 +302,9 @@ def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: return filter_dict - def _update_schema(self, new_prop:str, index: str): + def _update_schema(self, new_prop:str, index: Optional[str] = None): """Updates the schema with a new property""" + index = index or self.index property_dict = { "dataType": [ "string" @@ -305,8 +319,8 @@ def _check_document(self, cur_props: List[str], doc: dict) -> List[str]: return [item for item in doc.keys() if item not in cur_props] def write_documents( - self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 - ): + self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, + batch_size: int = 10_000, duplicate_documents: Optional[str] = None): """ Add new documents to the DocumentStore. @@ -314,12 +328,23 @@ def write_documents( configured with a module. If a module is configured, the embedding is automatically generated by Weaviate. :param index: index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. - :return: + :param duplicate_documents: Handle duplicates document based on parameter options. + Parameter options : ( 'skip','overwrite','fail') + skip: Ignore the duplicates documents + overwrite: Update any existing documents with the same ID when adding documents. + fail: an error is raised if the document ID of the document being added already + exists. + :raises DuplicateDocumentError: Exception trigger on duplicate document + :return: None """ index = index or self.index self._create_schema_and_index_if_not_exist(index) field_map = self._create_document_field_map() + duplicate_documents = duplicate_documents or self.duplicate_documents + assert duplicate_documents in self.duplicate_documents_options, \ + f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + if len(documents) == 0: logger.warning("Calling DocumentStore.write_documents() with empty list") return @@ -329,6 +354,7 @@ def write_documents( current_properties = self._get_current_properties(index) document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents) batched_documents = get_batches_from_generator(document_objects, batch_size) with tqdm(total=len(document_objects), disable=not self.progress_bar) as progress_bar: @@ -430,7 +456,7 @@ def get_all_documents( def _get_all_documents_in_index( self, - index: str, + index: Optional[str], filters: Optional[Dict[str, List[str]]] = None, batch_size: int = 10_000, only_documents_without_embedding: bool = False, @@ -508,7 +534,6 @@ def query( :param top_k: How many documents to return per query. :param index: The name of the index in the DocumentStore from which to retrieve documents """ - index = index or self.index # Build the properties to retrieve from Weaviate @@ -536,7 +561,11 @@ def query( .with_limit(top_k)\ .do() - results = query_output.get("data").get("Get").get(index) + results = [] + if query_output and "data" in query_output and "Get" in query_output.get("data"): + if query_output.get("data").get("Get").get(index): + results = query_output.get("data").get("Get").get(index) + documents = [] for result in results: doc = self._convert_weaviate_result_to_document(result, return_embedding=True) @@ -588,10 +617,14 @@ def query_by_embedding(self, .with_limit(top_k)\ .do() - results = query_output.get("data").get("Get").get(index) + results = [] + if query_output and "data" in query_output and "Get" in query_output.get("data"): + if query_output.get("data").get("Get").get(index): + results = query_output.get("data").get("Get").get(index) + documents = [] for result in results: - doc = self._convert_weaviate_result_to_document(result, return_embedding=True) + doc = self._convert_weaviate_result_to_document(result, return_embedding=return_embedding) documents.append(doc) return documents diff --git a/test/test_weaviate.py b/test/test_weaviate.py index 717615be45..bb9f946980 100644 --- a/test/test_weaviate.py +++ b/test/test_weaviate.py @@ -6,21 +6,24 @@ embedding_dim = 768 +def get_uuid(): + return str(uuid.uuid4()) + DOCUMENTS = [ - {"text": "text1", "key": "a", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, - {"text": "text2", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, - {"text": "text3", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, - {"text": "text4", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, - {"text": "text5", "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text1", "id":get_uuid(), "key": "a", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text2", "id":get_uuid(), "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text3", "id":get_uuid(), "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text4", "id":get_uuid(), "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "text5", "id":get_uuid(), "key": "b", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, ] DOCUMENTS_XS = [ # current "dict" format for a document - {"text": "My name is Carla and I live in Berlin", "meta": {"metafield": "test1", "name": "filename1"}, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "My name is Carla and I live in Berlin", "id":get_uuid(), "meta": {"metafield": "test1", "name": "filename1"}, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, # meta_field at the top level for backward compatibility - {"text": "My name is Paul and I live in New York", "metafield": "test2", "name": "filename2", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, + {"text": "My name is Paul and I live in New York", "id":get_uuid(), "metafield": "test2", "name": "filename2", "embedding": np.random.rand(embedding_dim).astype(np.float32)}, # Document object for a doc - Document(text="My name is Christelle and I live in Paris", meta={"metafield": "test3", "name": "filename3"}, embedding=np.random.rand(embedding_dim).astype(np.float32)) + Document(text="My name is Christelle and I live in Paris", id=get_uuid(), meta={"metafield": "test3", "name": "filename3"}, embedding=np.random.rand(embedding_dim).astype(np.float32)) ] @pytest.fixture(params=["weaviate"]) @@ -89,16 +92,19 @@ def test_get_all_document_filter_duplicate_value(document_store): Document( text="Doc1", meta={"fone": "f0"}, + id = get_uuid(), embedding= np.random.rand(embedding_dim).astype(np.float32) ), Document( text="Doc1", meta={"fone": "f1", "metaid": "0"}, + id = get_uuid(), embedding = np.random.rand(embedding_dim).astype(np.float32) ), Document( text="Doc2", meta={"fthree": "f0"}, + id = get_uuid(), embedding=np.random.rand(embedding_dim).astype(np.float32) ) ] @@ -113,6 +119,25 @@ def test_get_all_documents_generator(document_store): document_store.write_documents(DOCUMENTS) assert len(list(document_store.get_all_documents_generator(batch_size=2))) == 5 +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) +def test_write_with_duplicate_doc_ids(document_store): + id = get_uuid() + documents = [ + Document( + text="Doc1", + id=id, + embedding=np.random.rand(embedding_dim).astype(np.float32) + ), + Document( + text="Doc2", + id=id, + embedding=np.random.rand(embedding_dim).astype(np.float32) + ) + ] + document_store.write_documents(documents, duplicate_documents="skip") + with pytest.raises(Exception): + document_store.write_documents(documents, duplicate_documents="fail") @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) @pytest.mark.parametrize("update_existing_documents", [True, False]) @@ -131,10 +156,10 @@ def test_update_existing_documents(document_store, update_existing_documents): assert document_store.get_document_count() == 1 if update_existing_documents: - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="overwrite") else: with pytest.raises(Exception): - document_store.write_documents(updated_docs) + document_store.write_documents(updated_docs, duplicate_documents="fail") stored_docs = document_store.get_all_documents() assert len(stored_docs) == 1 @@ -145,10 +170,10 @@ def test_update_existing_documents(document_store, update_existing_documents): @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_write_document_meta(document_store): - uid1 = str(uuid.uuid4()) - uid2 = str(uuid.uuid4()) - uid3 = str(uuid.uuid4()) - uid4 = str(uuid.uuid4()) + uid1 = get_uuid() + uid2 = get_uuid() + uid3 = get_uuid() + uid4 = get_uuid() documents = [ {"text": "dict_without_meta", "id": uid1, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, {"text": "dict_with_meta", "metafield": "test2", "name": "filename2", "id": uid2, "embedding": np.random.rand(embedding_dim).astype(np.float32)}, @@ -181,7 +206,7 @@ def test_write_document_index(document_store): assert len(document_store.get_all_documents(index="Haystackone")) == 1 assert len(document_store.get_all_documents()) == 0 -'''@pytest.mark.parametrize("retriever", ["dpr", "embedding"], indirect=True) +@pytest.mark.parametrize("retriever", ["dpr", "embedding"], indirect=True) @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_update_embeddings(document_store, retriever): documents = [] @@ -238,7 +263,32 @@ def test_update_embeddings(document_store, retriever): doc_after_update = document_store.get_all_documents(index="HaystackTestOne", filters={"metafield": ["value_7"]})[0] embedding_after_update = doc_after_update.embedding np.testing.assert_raises(AssertionError, np.testing.assert_array_equal, embedding_before_update, embedding_after_update) -''' + + +@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +def test_query_by_embedding(document_store_with_docs): + docs = document_store_with_docs.query_by_embedding(np.random.rand(embedding_dim).astype(np.float32)) + assert len(docs) == 3 + + docs = document_store_with_docs.query_by_embedding(np.random.rand(embedding_dim).astype(np.float32), + top_k=1) + assert len(docs) == 1 + + docs = document_store_with_docs.query_by_embedding(np.random.rand(embedding_dim).astype(np.float32), + filters = {"name": ['filename2']}) + assert len(docs) == 1 + +'''@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +def test_query(document_store_with_docs): + query_text = 'live' + docs = document_store_with_docs.query(query_text) + assert len(docs) == 3 + + docs = document_store_with_docs.query(query_text, top_k=1) + assert len(docs) == 1 + + docs = document_store_with_docs.query(query_text, filters = {"name": ['filename2']}) + assert len(docs) == 1''' @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_delete_all_documents(document_store_with_docs): @@ -248,7 +298,6 @@ def test_delete_all_documents(document_store_with_docs): documents = document_store_with_docs.get_all_documents() assert len(documents) == 0 -@pytest.mark.elasticsearch @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_delete_documents_with_filters(document_store_with_docs): document_store_with_docs.delete_all_documents(filters={"metafield": ["test1", "test2"]}) From 4bf532c3d0f5840e6d933273c976b525a1f81685 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 11:47:09 +0530 Subject: [PATCH 12/22] fixed mypy errors --- haystack/document_store/weaviate.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 7b121e5c6b..8aa0dca096 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -192,7 +192,7 @@ def _convert_weaviate_result_to_document( """ score = None probability = None - text = None + text = "" question = None id = result.get("id") @@ -200,7 +200,7 @@ def _convert_weaviate_result_to_document( # If properties key is present, get all the document fields from it. # otherwise, a direct lookup in result root dict - props:dict = result.get("properties") + props = result.get("properties") if not props: props = result @@ -211,12 +211,14 @@ def _convert_weaviate_result_to_document( question = props.get(self.faq_question_field) # Weaviate creates "_additional" key for semantic search - if props.get("_additional"): - score = props.get("_additional").get('certainty') if "certainty" in props.get("_additional") else None - if score: + if "_additional" in props: + if "certainty" in props["_additional"]: + score = props["_additional"]['certainty'] probability = score - id = props.get("_additional").get('id') if "id" in props.get("_additional") else None - embedding = props.get("_additional").get('vector') if "vector" in props.get("_additional") else None + if "id" in props["_additional"]: + id = props["_additional"]['id'] + if "vector" in props["_additional"]: + embedding = props["_additional"]['vector'] props.pop("_additional", None) # We put all additional data of the doc into meta_data and return it in the API @@ -254,9 +256,11 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D 'text': 'text_5'}, 'vector': []}''' index = index or self.index + document = None result = self.weaviate_client.data_object.get_by_id(id, with_vector=True) if result: - return self._convert_weaviate_result_to_document(result, return_embedding=True) + document = self._convert_weaviate_result_to_document(result, return_embedding=True) + return document def get_documents_by_id(self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000) -> List[Document]: @@ -284,6 +288,7 @@ def _get_current_properties(self, index: Optional[str] = None) -> List[str]: def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: """Transform Haystack filter conditions to Weaviate where filter clauses""" weaviate_filters = [] + weaviate_filter = {} for key, values in filters.items(): for value in values: weaviate_filter = { @@ -297,10 +302,9 @@ def _build_filter_clause(self, filters:Dict[str, List[str]]) -> dict: "operator": "Or", "operands": weaviate_filters } + return filter_dict else: - filter_dict = weaviate_filters[0] - - return filter_dict + return weaviate_filter def _update_schema(self, new_prop:str, index: Optional[str] = None): """Updates the schema with a new property""" @@ -484,7 +488,7 @@ def _get_all_documents_in_index( if result.get("data").get("Get").get(index): all_docs = result.get("data").get("Get").get(index) - return all_docs + yield from all_docs def get_all_documents_generator( self, From 500ec218ff8459ca3e58b9402942c95bb19e51df Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 15:05:13 +0530 Subject: [PATCH 13/22] Added Weaviate to requirements --- .github/workflows/ci.yml | 3 +++ requirements.txt | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f6df5c170..9400e9ad0b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,6 +76,9 @@ jobs: - name: Run Milvus run: docker run -d -p 19530:19530 -p 19121:19121 milvusdb/milvus:1.1.0-cpu-d050721-5e559c + - name: Run Weaviate + run: docker run -d -p 8080:8080 --name haystack_test_weaviate semitechnologies/weaviate:1.3.0 + - name: Run GraphDB run: docker run -d -p 7200:7200 --name haystack_test_graphdb deepset/graphdb-free:9.4.1-adoptopenjdk11 diff --git a/requirements.txt b/requirements.txt index 7303dae0a5..c83ba516e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,4 +30,5 @@ pymilvus #selenium #webdriver-manager SPARQLWrapper -mmh3 \ No newline at end of file +mmh3 +weaviate-client \ No newline at end of file From 961110e061d68395dd1c98341f05c44925a864a7 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 15:59:43 +0530 Subject: [PATCH 14/22] Fix the weaviate docker env variables --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9400e9ad0b..c3ba34b33e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: run: docker run -d -p 19530:19530 -p 19121:19121 milvusdb/milvus:1.1.0-cpu-d050721-5e559c - name: Run Weaviate - run: docker run -d -p 8080:8080 --name haystack_test_weaviate semitechnologies/weaviate:1.3.0 + run: docker run -d -p 8080:8080 --name haystack_test_weaviate --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.3.0 - name: Run GraphDB run: docker run -d -p 7200:7200 --name haystack_test_graphdb deepset/graphdb-free:9.4.1-adoptopenjdk11 From 0a161f97de4528c430aa79f86e19fb213f5aeb12 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 16:28:12 +0530 Subject: [PATCH 15/22] Fixing test dependencies for now --- test/conftest.py | 5 ++--- test/test_weaviate.py | 6 ++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 4fdc278a9e..3b25c9f42f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -327,15 +327,14 @@ def get_retriever(retriever_type, document_store): return retriever -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) yield document_store document_store.delete_all_documents() - -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus", "weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store diff --git a/test/test_weaviate.py b/test/test_weaviate.py index bb9f946980..505ee2310d 100644 --- a/test/test_weaviate.py +++ b/test/test_weaviate.py @@ -33,6 +33,12 @@ def document_store_with_docs(request): yield document_store document_store.delete_all_documents() +@pytest.fixture(params=["weaviate"]) +def document_store(request): + document_store = get_document_store(request.param) + yield document_store + document_store.delete_all_documents() + @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_get_all_documents_without_filters(document_store_with_docs): documents = document_store_with_docs.get_all_documents() From 3a0160a532a3d1c2ae836fc5171b966483c495de Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Tue, 8 Jun 2021 22:03:30 +0530 Subject: [PATCH 16/22] Created weaviate test marker and fixed query --- haystack/document_store/weaviate.py | 17 +++++-------- test/conftest.py | 2 ++ test/pytest.ini | 1 + test/test_weaviate.py | 39 +++++++++++++++++++++-------- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 8aa0dca096..151be3d05b 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -523,7 +523,7 @@ def get_all_documents_generator( def query( self, - query: Optional[str], + query: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None, top_k: int = 10, custom_query: Optional[str] = None, @@ -536,6 +536,8 @@ def query( :param query: The query :param filters: A dictionary where the keys specify a metadata field and the value is a list of accepted values for that field :param top_k: How many documents to return per query. + :param custom_query: Custom query that will executed using query.raw method, for more details refer + https://www.semi.technology/developers/weaviate/current/graphql-references/filters.html :param index: The name of the index in the DocumentStore from which to retrieve documents """ index = index or self.index @@ -544,10 +546,6 @@ def query( properties = self._get_current_properties(index) properties.append("_additional {id, certainty, vector}") - query_string = { - "concepts": [query] - } - if custom_query: query_output = self.weaviate_client.query.raw(custom_query) elif filters: @@ -555,15 +553,12 @@ def query( query_output = self.weaviate_client.query\ .get(class_name=index, properties=properties)\ .with_where(filter_dict)\ - .with_near_text(query_string)\ .with_limit(top_k)\ .do() else: - query_output = self.weaviate_client.query\ - .get(class_name=index, properties=properties)\ - .with_near_text(query_string)\ - .with_limit(top_k)\ - .do() + raise NotImplementedError("Weaviate does not support inverted index text query. However, " + "it allows to search by filters example : {'text': 'some text'} or " + "use a custom GraphQL query in text format!") results = [] if query_output and "data" in query_output and "Get" in query_output.get("data"): diff --git a/test/conftest.py b/test/conftest.py index 3b25c9f42f..3af5700075 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -64,6 +64,8 @@ def pytest_collection_modifyitems(items): item.add_marker(pytest.mark.pipeline) elif "slow" in item.nodeid: item.add_marker(pytest.mark.slow) + elif "weaviate" in item.nodeid: + item.add_marker(pytest.mark.weaviate) @pytest.fixture(scope="session") diff --git a/test/pytest.ini b/test/pytest.ini index 9da069d5bf..fe36dbb184 100644 --- a/test/pytest.ini +++ b/test/pytest.ini @@ -8,3 +8,4 @@ markers = generator: marks generator tests (deselect with '-m "not generator"') pipeline: marks tests with pipeline summarizer: marks summarizer tests + weaviate: marks tests that require weaviate container diff --git a/test/test_weaviate.py b/test/test_weaviate.py index 505ee2310d..56fc50f823 100644 --- a/test/test_weaviate.py +++ b/test/test_weaviate.py @@ -39,6 +39,7 @@ def document_store(request): yield document_store document_store.delete_all_documents() +@pytest.mark.weaviate @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_get_all_documents_without_filters(document_store_with_docs): documents = document_store_with_docs.get_all_documents() @@ -47,6 +48,7 @@ def test_get_all_documents_without_filters(document_store_with_docs): assert {d.meta["name"] for d in documents} == {"filename1", "filename2", "filename3"} assert {d.meta["metafield"] for d in documents} == {"test1", "test2", "test3"} +@pytest.mark.weaviate def test_get_all_documents_with_correct_filters(document_store_with_docs): documents = document_store_with_docs.get_all_documents(filters={"metafield": ["test2"]}) assert len(documents) == 1 @@ -57,21 +59,24 @@ def test_get_all_documents_with_correct_filters(document_store_with_docs): assert {d.meta["name"] for d in documents} == {"filename1", "filename3"} assert {d.meta["metafield"] for d in documents} == {"test1", "test3"} +@pytest.mark.weaviate def test_get_all_documents_with_incorrect_filter_name(document_store_with_docs): documents = document_store_with_docs.get_all_documents(filters={"incorrectmetafield": ["test2"]}) assert len(documents) == 0 - +@pytest.mark.weaviate def test_get_all_documents_with_incorrect_filter_value(document_store_with_docs): documents = document_store_with_docs.get_all_documents(filters={"metafield": ["incorrect_value"]}) assert len(documents) == 0 +@pytest.mark.weaviate def test_get_documents_by_id(document_store_with_docs): documents = document_store_with_docs.get_all_documents() doc = document_store_with_docs.get_document_by_id(documents[0].id) assert doc.id == documents[0].id assert doc.text == documents[0].text +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_get_document_count(document_store): document_store.write_documents(DOCUMENTS) @@ -79,6 +84,7 @@ def test_get_document_count(document_store): assert document_store.get_document_count(filters={"key": ["a"]}) == 1 assert document_store.get_document_count(filters={"key": ["b"]}) == 4 +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) @pytest.mark.parametrize("batch_size", [2]) def test_weaviate_write_docs(document_store, batch_size): @@ -92,6 +98,7 @@ def test_weaviate_write_docs(document_store, batch_size): documents_indexed = document_store.get_all_documents(batch_size=batch_size) assert len(documents_indexed) == len(DOCUMENTS) +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_get_all_document_filter_duplicate_value(document_store): documents = [ @@ -120,12 +127,13 @@ def test_get_all_document_filter_duplicate_value(document_store): assert len(documents) == 1 assert {d.meta["metaid"] for d in documents} == {"0"} +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_get_all_documents_generator(document_store): document_store.write_documents(DOCUMENTS) assert len(list(document_store.get_all_documents_generator(batch_size=2))) == 5 -@pytest.mark.elasticsearch +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_write_with_duplicate_doc_ids(document_store): id = get_uuid() @@ -145,6 +153,7 @@ def test_write_with_duplicate_doc_ids(document_store): with pytest.raises(Exception): document_store.write_documents(documents, duplicate_documents="fail") +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) @pytest.mark.parametrize("update_existing_documents", [True, False]) def test_update_existing_documents(document_store, update_existing_documents): @@ -174,6 +183,7 @@ def test_update_existing_documents(document_store, update_existing_documents): else: assert stored_docs[0].text == original_docs[0]["text"] +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_write_document_meta(document_store): uid1 = get_uuid() @@ -195,7 +205,7 @@ def test_write_document_meta(document_store): assert not document_store.get_document_by_id(uid3).meta assert document_store.get_document_by_id(uid4).meta["metafield"] == "test4" - +@pytest.mark.weaviate @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_write_document_index(document_store): documents = [ @@ -212,6 +222,7 @@ def test_write_document_index(document_store): assert len(document_store.get_all_documents(index="Haystackone")) == 1 assert len(document_store.get_all_documents()) == 0 +@pytest.mark.weaviate @pytest.mark.parametrize("retriever", ["dpr", "embedding"], indirect=True) @pytest.mark.parametrize("document_store", ["weaviate"], indirect=True) def test_update_embeddings(document_store, retriever): @@ -270,7 +281,7 @@ def test_update_embeddings(document_store, retriever): embedding_after_update = doc_after_update.embedding np.testing.assert_raises(AssertionError, np.testing.assert_array_equal, embedding_before_update, embedding_after_update) - +@pytest.mark.weaviate @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_query_by_embedding(document_store_with_docs): docs = document_store_with_docs.query_by_embedding(np.random.rand(embedding_dim).astype(np.float32)) @@ -284,18 +295,23 @@ def test_query_by_embedding(document_store_with_docs): filters = {"name": ['filename2']}) assert len(docs) == 1 -'''@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) +@pytest.mark.weaviate +@pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_query(document_store_with_docs): - query_text = 'live' - docs = document_store_with_docs.query(query_text) - assert len(docs) == 3 + query_text = 'My name is Carla and I live in Berlin' + with pytest.raises(Exception): + docs = document_store_with_docs.query(query_text) + + docs = document_store_with_docs.query(filters = {"name": ['filename2']}) + assert len(docs) == 1 - docs = document_store_with_docs.query(query_text, top_k=1) + docs = document_store_with_docs.query(filters={"text":[query_text.lower()]}) assert len(docs) == 1 - docs = document_store_with_docs.query(query_text, filters = {"name": ['filename2']}) - assert len(docs) == 1''' + docs = document_store_with_docs.query(filters={"text":['live']}) + assert len(docs) == 3 +@pytest.mark.weaviate @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_delete_all_documents(document_store_with_docs): assert len(document_store_with_docs.get_all_documents()) == 3 @@ -304,6 +320,7 @@ def test_delete_all_documents(document_store_with_docs): documents = document_store_with_docs.get_all_documents() assert len(documents) == 0 +@pytest.mark.weaviate @pytest.mark.parametrize("document_store_with_docs", ["weaviate"], indirect=True) def test_delete_documents_with_filters(document_store_with_docs): document_store_with_docs.delete_all_documents(filters={"metafield": ["test1", "test2"]}) From 80bc973ed09efeaeccac7fa76dd6541c1289d960 Mon Sep 17 00:00:00 2001 From: Malte Pietsch Date: Wed, 9 Jun 2021 16:34:20 +0200 Subject: [PATCH 17/22] Update docstring --- haystack/document_store/weaviate.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/haystack/document_store/weaviate.py b/haystack/document_store/weaviate.py index 151be3d05b..e654a520d2 100644 --- a/haystack/document_store/weaviate.py +++ b/haystack/document_store/weaviate.py @@ -14,14 +14,14 @@ class WeaviateDocumentStore(BaseDocumentStore): """ - https://www.semi.technology/developers/weaviate/current/index.html#what-is-weaviate + Weaviate is a cloud-native, modular, real-time vector search engine built to scale your machine learning models. - 1. Highly efficient large scale vector searches - 2. State of the Art classification techniques - 3. Easily retrieve your data in graph format using GraphQL - - Weaviate is a document store to store, query the documents for search - Both the document and embedding (generated by Weaviate) are stored in weaviate. + (See https://www.semi.technology/developers/weaviate/current/index.html#what-is-weaviate) + + Some of the key differences in contrast to FAISS & Milvus: + 1. Stores everything in one place: documents, meta data and vectors - so less network overhead when scaling this up + 2. Allows combination of vector search and scalar filtering, i.e. you can filter for a certain tag and do dense retrieval on that subset + 3. Has less variety of ANN algorithms, as of now only HNSW. Weaviate python client is used to connect to the server, more details are here https://weaviate-python-client.readthedocs.io/en/docs/weaviate.html From 75725fb1e5d50dfddf7ee02fd1e2ada8015160cb Mon Sep 17 00:00:00 2001 From: Malte Pietsch Date: Wed, 9 Jun 2021 16:47:18 +0200 Subject: [PATCH 18/22] Add documentation --- docs/_src/usage/usage/document_store.md | 41 ++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/docs/_src/usage/usage/document_store.md b/docs/_src/usage/usage/document_store.md index 43baecdb0d..77fb8b4c18 100644 --- a/docs/_src/usage/usage/document_store.md +++ b/docs/_src/usage/usage/document_store.md @@ -116,6 +116,27 @@ from haystack.document_store import SQLDocumentStore document_store = SQLDocumentStore() ``` + + + +
+ + +
+ +The `WeaviateDocumentStore` requires a running Weaviate Server. +You can start a basice instance like this (see Weaviate docs for details): +``` + docker run -d -p 8080:8080 --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.3.0 +``` + +Afterwards, you can use it in Haystack: +```python +from haystack.document_store import WeaviateDocumentStore + +document_store = WeaviateDocumentStore() +``` +
@@ -264,6 +285,24 @@ The Document Stores have different characteristics. You should choose one depend + +
+ + +
+ +**Pros:** +- Simple vector search +- Stores everything in one place: documents, meta data and vectors - so less network overhead when scaling this up +- Allows combination of vector search and scalar filtering, i.e. you can filter for a certain tag and do dense retrieval on that subset + +**Cons:** +- Less options for ANN algorithms than FAISS or Milvus +- No BM25 / Tf-idf retrieval + +
+
+
@@ -276,4 +315,4 @@ The Document Stores have different characteristics. You should choose one depend **Vector Specialist:** Use the `MilvusDocumentStore`, if you want to focus on dense retrieval and possibly deal with larger datasets -
\ No newline at end of file + From 2382cacc11a9e1fcd5057b4d3193d6cb2d371eca Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Thu, 10 Jun 2021 08:45:02 +0530 Subject: [PATCH 19/22] Bump up weaviate version --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c3ba34b33e..29e2bb9649 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: run: docker run -d -p 19530:19530 -p 19121:19121 milvusdb/milvus:1.1.0-cpu-d050721-5e559c - name: Run Weaviate - run: docker run -d -p 8080:8080 --name haystack_test_weaviate --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.3.0 + run: docker run -d -p 8080:8080 --name haystack_test_weaviate --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.4.0 - name: Run GraphDB run: docker run -d -p 7200:7200 --name haystack_test_graphdb deepset/graphdb-free:9.4.1-adoptopenjdk11 From ec5ba0d37a760ca1e57bfb364627a03e72374628 Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Thu, 10 Jun 2021 09:02:44 +0530 Subject: [PATCH 20/22] Bump up weaviate version in documentation --- docs/_src/usage/usage/document_store.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_src/usage/usage/document_store.md b/docs/_src/usage/usage/document_store.md index 77fb8b4c18..7e3c3549b0 100644 --- a/docs/_src/usage/usage/document_store.md +++ b/docs/_src/usage/usage/document_store.md @@ -127,7 +127,7 @@ document_store = SQLDocumentStore() The `WeaviateDocumentStore` requires a running Weaviate Server. You can start a basice instance like this (see Weaviate docs for details): ``` - docker run -d -p 8080:8080 --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.3.0 + docker run -d -p 8080:8080 --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.4.0 ``` Afterwards, you can use it in Haystack: From 80162fb8f2fe5d76b27ac2c452469cfa62cb15dd Mon Sep 17 00:00:00 2001 From: venuraja79 Date: Thu, 10 Jun 2021 09:03:42 +0530 Subject: [PATCH 21/22] Bump up weaviate version in documentation --- docs/_src/usage/usage/document_store.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_src/usage/usage/document_store.md b/docs/_src/usage/usage/document_store.md index 7e3c3549b0..bd4e5b3f4e 100644 --- a/docs/_src/usage/usage/document_store.md +++ b/docs/_src/usage/usage/document_store.md @@ -125,7 +125,7 @@ document_store = SQLDocumentStore()
The `WeaviateDocumentStore` requires a running Weaviate Server. -You can start a basice instance like this (see Weaviate docs for details): +You can start a basic instance like this (see Weaviate docs for details): ``` docker run -d -p 8080:8080 --env AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED='true' --env PERSISTENCE_DATA_PATH='/var/lib/weaviate' semitechnologies/weaviate:1.4.0 ``` From 7829884415082948b3585184c91908b27fbe152a Mon Sep 17 00:00:00 2001 From: Malte Pietsch Date: Thu, 10 Jun 2021 09:01:36 +0200 Subject: [PATCH 22/22] Updgrade weaviate version --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index 3af5700075..ceeb4a7182 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -117,7 +117,7 @@ def weaviate_fixture(): shell=True ) status = subprocess.run( - ['docker run -d --name haystack_test_weaviate -p 8080:8080 semitechnologies/weaviate:1.3.0'], + ['docker run -d --name haystack_test_weaviate -p 8080:8080 semitechnologies/weaviate:1.4.0'], shell=True ) if status.returncode: