Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
de930a4
feat(providers/weaviate): update min weaviate-client version to 4.4.0
Lee-W Jun 12, 2024
88fcf40
feat(providers/weaviate): update airflow connection to v4 style
Lee-W Jun 12, 2024
d0a47ba
feat(providers/weaviate): add http_secure and grpc_secure
Lee-W Jun 13, 2024
41f25d5
feat(providers/weaviate): migrate test_connections to v4 API
Lee-W Jun 13, 2024
c366d54
feat(providers/weaviate): migrate create_class to create_collection
Lee-W Jun 13, 2024
cf282b0
feat(providers/weaviate): migrate get_schema to get_collection_config…
Lee-W Jun 13, 2024
63dd7a6
feat(providers/weaviate): migrate delete_classes to delete_collections
Lee-W Jun 13, 2024
814ca57
feat(providers/weaviate): migrate query_with_vector to v4 API
Lee-W Jun 14, 2024
b505d2a
feat(providers/weaviate): migrate query_with_text to v4 API
Lee-W Jun 14, 2024
35aadc6
feat(providers/weaviate): migrate create_object to v4 API
Lee-W Jun 14, 2024
f1cda43
feat(providers/weaviate): migrate get_object to v4 API
Lee-W Jun 14, 2024
95f9db4
feat(providers/weaviate): migrate delete_object to v4 API
Lee-W Jun 14, 2024
c6b3e68
feat(providers/weaviate): migrate update_object to v4 API
Lee-W Jun 14, 2024
52495de
feat(providers/weaviate): migrate replace_object to v4 API
Lee-W Jun 17, 2024
2ade84d
feat(providers/weaviate): migrate object_exists to v4 API
Lee-W Jun 17, 2024
5b84c85
feat(providers/weaviate): migrate _generate_uuids to v4 API
Lee-W Jun 14, 2024
84a1ce3
refactor(providers/weaviate): extract common get collection logic
Lee-W Jun 14, 2024
45dd131
feat(providers/weaviate): migrate batch_data to v4 API
Lee-W Jun 14, 2024
0f1d956
feat(providers/weaviate): migrate get_or_create_object to v4 API
Lee-W Jun 14, 2024
9539b46
feat(providers/weaviate): migrate _delete_objects to v4 API
Lee-W Jun 14, 2024
52af6ca
feat(providers/weaviate): migrate _delete_all_documents_objects to v4…
Lee-W Jun 14, 2024
780305b
feat(providers/weaviate): migrate _get_documents_to_uuid_map to v4 API
Lee-W Jun 17, 2024
5e37b00
feat(providers/weaviate): migrate _get_segregated_documents to v4 API
Lee-W Jun 17, 2024
1c4f0ec
feat(providers/weaviate): migrate create_or_replace_document_objects …
Lee-W Jun 17, 2024
0d442a0
feat(providers/weaviate): remove validate_object
Lee-W Jun 14, 2024
a600004
refactor(providers/weaviate): remove unused retry_status_codes
Lee-W Jun 13, 2024
db5bfc7
refactor(providers/weaviate): remove deprecated get_client
Lee-W Jun 17, 2024
3849417
feat(providers/weaviate): migrate update_config to v4 API
Lee-W Jun 17, 2024
9f34735
feat(providers/weaviate): remove create_schema and delete_all_schema …
Lee-W Jun 17, 2024
fd595ac
feat(providers/weaviate): remove create_or_replace_classes as there's…
Lee-W Jun 17, 2024
060c92e
feat(providers/weaviate): remove _compare_schema_subset, _convert_pro…
Lee-W Jun 17, 2024
32f8505
feat(providers/weaviate): migrate WeaviateIngestOperator to v4 API
Lee-W Jun 17, 2024
bdff77f
feat(providers/weaviate): migrate WeaviateDocumentIngestOperator to v…
Lee-W Jun 17, 2024
d341df7
fix(providers/weaviate): fix connect default port
Lee-W Jun 18, 2024
de825ba
refactor(provider/weaviate): rename update_config as update_collectio…
Lee-W Jun 18, 2024
d7e51ec
fix(providers/weaviate): fix batch_data wrong parameter used
Lee-W Jun 18, 2024
49e87d8
fix(providers/weaviate): fix wrong v4 API calls in get_all_objects
Lee-W Jun 18, 2024
b07725e
fix(providers/weaviate): fix get_all_object with as_dataframe set to …
Lee-W Jun 18, 2024
391a01f
fix(providers/weaviate): fix create_or_replace_document_objects
Lee-W Jun 19, 2024
87b9125
refactor(providers/weaviate): rename query_without_vector as query_wi…
Lee-W Jun 19, 2024
aabb499
feat(providers/weaviate): migrate operators to v4 API hook
Lee-W Jun 19, 2024
86fcd03
refactor(providers/weavite): group similar methods together
Lee-W Jun 19, 2024
93abecd
test(providers/weaviate): update unit tests for operators
Lee-W Jun 20, 2024
756dc67
test(providers/weaviate): fix hooks tests due to API migration
Lee-W Jun 20, 2024
6593f17
test(providers/weaviate): migrate system tests to v4 API
Lee-W Jun 20, 2024
95d159c
docs(providers/weaviate): update doc for v4 API migration
Lee-W Jun 20, 2024
f8fa89b
test(providers/weaviate): fix hooks tests due to API migration
Lee-W Jun 20, 2024
8a2af9c
style(providers/weaviate): fix mypy warnings
Lee-W Jun 20, 2024
7e3a8ae
docs(providers/weaviate): update changelog
Lee-W Jun 25, 2024
e881d70
docs(providers/weaviate): add more detail description to host
Lee-W Jun 25, 2024
a2224e7
docs(providers/weaviate): fix changelog rst format
Lee-W Jun 25, 2024
8f6f4ae
build(providers/weaviate): add 2.0.0 to provider metadata
Lee-W Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions airflow/providers/weaviate/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@
Changelog
---------

1.4.2
.....
2.0.0
......


Breaking changes
~~~~~~~~~~~~~~~~

.. warning::
* We bumped the minimum version of weaviate-client to 4.4.0. Many of the concepts and methods have been changed. We suggest you read `Migrate from v3 to v4 <https://weaviate.io/developers/weaviate/client-libraries/python/v3_v4_migration>`_ before you upgrade to this version

* Add columns ``Port``, ``gRPC host``, ``gRPC port`` and ``Use https``, ``Use a secure channel for the underlying gRPC API`` options to the Weaviate connection. The default values from Airflow providers may not be suitable for using Weaviate correctly, so we recommend explicitly specifying these values.
* Update ``WeaviateIngestOperator`` and ``WeaviateDocumentIngestOperator`` to use ``WeaviateHook`` with ``weaviate-client`` v4 API. The major changes are changing argument ``class_name`` to ``collection_name`` and removing ``batch_params``.
* Update ``WeaviateHook`` to utilize ``weaviate-client`` v4 API. The implementation has been extensively changed. We recommend reading `Migrate from v3 to v4 <https://weaviate.io/developers/weaviate/client-libraries/python/v3_v4_migration>`_ to understand the changes on the Weaviate side before using the updated ``WeaviateHook``.
* Migrate the following ``WeaviateHook`` public methods to v4 API: ``test_connections``, ``query_with_vector``, ``create_object``, ``get_object``, ``delete_object``, ``update_object``, ``replace_object``, ``object_exists``, ``batch_data``, ``get_or_create_object``, ``create_or_replace_document_objects``
* Rename ``WeaviateHook`` public methods ``update_schema`` as ``update_collection_configuration``, ``create_class`` as ``create_collection``, ``get_schema`` as ``get_collection_configuraiton``, ``delete_classes`` as ``delete_collections`` and ``query_without_vector`` as ``query_with_text``.
* Remove the following ``WeaviateHook`` public methods: ``validate_object``, ``update_schema``, ``create_schema``, ``delete_all_schema``, ``check_subset_of_schema``
* Remove deprecated method ``WeaviateHook.get_client``
* Remove unused argument ``retry_status_codes`` in ``WeaviateHook.__init__``

Misc
~~~~
Expand Down
804 changes: 297 additions & 507 deletions airflow/providers/weaviate/hooks/weaviate.py

Large diffs are not rendered by default.

37 changes: 13 additions & 24 deletions airflow/providers/weaviate/operators/weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

if TYPE_CHECKING:
import pandas as pd
from weaviate.types import UUID

from airflow.utils.context import Context

Expand All @@ -43,11 +44,10 @@ class WeaviateIngestOperator(BaseOperator):
custom vectors and store them in the Weaviate class.

:param conn_id: The Weaviate connection.
:param class_name: The Weaviate class to be used for storing the data objects into.
:param collection: The Weaviate collection to be used for storing the data objects into.
:param input_data: The list of dicts or pandas dataframe representing Weaviate data objects to generate
embeddings on (or provides custom vectors) and store them in the Weaviate class.
:param vector_col: key/column name in which the vectors are stored.
:param batch_params: Additional parameters for Weaviate batch configuration.
:param hook_params: Optional config params to be passed to the underlying hook.
Should match the desired hook constructor params.
:param input_json: (Deprecated) The JSON representing Weaviate data objects to generate embeddings on
Expand All @@ -59,25 +59,23 @@ class WeaviateIngestOperator(BaseOperator):
def __init__(
self,
conn_id: str,
class_name: str,
collection_name: str,
input_data: list[dict[str, Any]] | pd.DataFrame | None = None,
vector_col: str = "Vector",
uuid_column: str = "id",
tenant: str | None = None,
batch_params: dict | None = None,
hook_params: dict | None = None,
input_json: list[dict[str, Any]] | pd.DataFrame | None = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.class_name = class_name
self.collection_name = collection_name
self.conn_id = conn_id
self.vector_col = vector_col
self.input_json = input_json
self.uuid_column = uuid_column
self.tenant = tenant
self.input_data = input_data
self.batch_params = batch_params or {}
self.hook_params = hook_params or {}

if (self.input_data is None) and (input_json is not None):
Expand All @@ -96,18 +94,14 @@ def hook(self) -> WeaviateHook:
"""Return an instance of the WeaviateHook."""
return WeaviateHook(conn_id=self.conn_id, **self.hook_params)

def execute(self, context: Context) -> list:
def execute(self, context: Context) -> None:
self.log.debug("Input data: %s", self.input_data)
insertion_errors: list = []
self.hook.batch_data(
class_name=self.class_name,
collection_name=self.collection_name,
data=self.input_data,
batch_config_params=self.batch_params,
vector_col=self.vector_col,
uuid_col=self.uuid_column,
tenant=self.tenant,
)
return insertion_errors


class WeaviateDocumentIngestOperator(BaseOperator):
Expand All @@ -132,12 +126,11 @@ class WeaviateDocumentIngestOperator(BaseOperator):
error: raise an error if an object belonging to a existing document is tried to be created.

:param data: A single pandas DataFrame or a list of dicts to be ingested.
:param class_name: Name of the class in Weaviate schema where data is to be ingested.
:param collection_name: Name of the collection in Weaviate schema where data is to be ingested.
:param existing: Strategy for handling existing data: 'skip', or 'replace'. Default is 'skip'.
:param document_column: Column in DataFrame that identifying source document.
:param uuid_column: Column with pre-generated UUIDs. If not provided, UUIDs will be generated.
:param vector_column: Column with embedding vectors for pre-embedded data.
:param batch_config_params: Additional parameters for Weaviate batch configuration.
:param tenant: The tenant to which the object will be added.
:param verbose: Flag to enable verbose output during the ingestion process.
:param hook_params: Optional config params to be passed to the underlying hook.
Expand All @@ -150,12 +143,11 @@ def __init__(
self,
conn_id: str,
input_data: pd.DataFrame | list[dict[str, Any]] | list[pd.DataFrame],
class_name: str,
collection_name: str,
document_column: str,
existing: str = "skip",
uuid_column: str = "id",
vector_col: str = "Vector",
batch_config_params: dict | None = None,
tenant: str | None = None,
verbose: bool = False,
hook_params: dict | None = None,
Expand All @@ -164,12 +156,11 @@ def __init__(
super().__init__(**kwargs)
self.conn_id = conn_id
self.input_data = input_data
self.class_name = class_name
self.collection_name = collection_name
self.document_column = document_column
self.existing = existing
self.uuid_column = uuid_column
self.vector_col = vector_col
self.batch_config_params = batch_config_params
self.tenant = tenant
self.verbose = verbose
self.hook_params = hook_params or {}
Expand All @@ -179,22 +170,20 @@ def hook(self) -> WeaviateHook:
"""Return an instance of the WeaviateHook."""
return WeaviateHook(conn_id=self.conn_id, **self.hook_params)

def execute(self, context: Context) -> list:
def execute(self, context: Context) -> Sequence[dict[str, UUID | str] | None]:
"""
Create or replace objects belonging to documents.

:return: List of UUID which failed to create
"""
self.log.debug("Total input objects : %s", len(self.input_data))
insertion_errors = self.hook.create_or_replace_document_objects(
batch_delete_error = self.hook.create_or_replace_document_objects(
data=self.input_data,
class_name=self.class_name,
collection_name=self.collection_name,
document_column=self.document_column,
existing=self.existing,
uuid_column=self.uuid_column,
vector_column=self.vector_col,
batch_config_params=self.batch_config_params,
tenant=self.tenant,
verbose=self.verbose,
)
return insertion_errors
return batch_delete_error
4 changes: 2 additions & 2 deletions airflow/providers/weaviate/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ source-date-epoch: 1718605569

# note that those versions are maintained by release manager - do not update them manually
versions:
- 1.4.2
- 2.0.0
- 1.4.1
- 1.4.0
- 1.3.4
Expand All @@ -50,7 +50,7 @@ integrations:
dependencies:
- apache-airflow>=2.7.0
- httpx>=0.25.0
- weaviate-client>=3.24.2
- weaviate-client>=4.4.0
# In pandas 2.2 minimal version of the sqlalchemy is 2.0
# https://pandas.pydata.org/docs/whatsnew/v2.2.0.html#increased-minimum-versions-for-dependencies
# However Airflow not fully supports it yet: https://github.com/apache/airflow/issues/28723
Expand Down
20 changes: 18 additions & 2 deletions docs/apache-airflow-providers-weaviate/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ Configuring the Connection
--------------------------

Host (required)
Host URL to connect to the Weaviate cluster.
The host to use for the Weaviate cluster REST and GraphQL API calls. DO NOT include the schema (i.e., http or https).

OIDC Username (optional)
Username for the OIDC user when OIDC option is to be used for authentication.

OIDC Password (optional)
Password for the OIDC user when OIDC option is to be used for authentication.

Port (option)
The port to use for the Weaviate cluster REST and GraphQL API calls.

Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in the
connection. All parameters are optional.
Expand All @@ -48,11 +51,24 @@ Extra (optional)
* If you'd like to use Vectorizers for your class, configure the API keys to use the corresponding
embedding API. The extras accepts a key ``additional_headers`` containing the dictionary
of API keys for the embedding API authentication. They are mentioned in a section here:
`addtional_headers <https://weaviate.io/developers/academy/zero_to_mvp/hello_weaviate/hands_on#-client-instantiation>`__
`Third party API keys <https://weaviate.io/developers/weaviate/starter-guides/connect#third-party-api-keys>`__

Weaviate API Token (optional)
Specify your Weaviate API Key to connect when API Key option is to be used for authentication.

Use https (optional)
Whether to use https for the Weaviate cluster REST and GraphQL API calls.

gRPC host (optional)
The host to use for the Weaviate cluster gRPC API.

gRPC port (optional)
The port to use for the Weaviate cluster gRPC API.

Use a secure channel for the underlying gRPC API (optional)
Whether to use a secure channel for the the Weaviate cluster gRPC API.


Supported Authentication Methods
--------------------------------
* API Key Authentication: This method uses the Weaviate API Key to authenticate the connection. You can either have the
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-weaviate/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ PIP package Version required
=================== =========================================
``apache-airflow`` ``>=2.7.0``
``httpx`` ``>=0.25.0``
``weaviate-client`` ``>=3.24.2``
``weaviate-client`` ``>=4.4.0``
``pandas`` ``>=2.1.2,<2.2; python_version >= "3.9"``
``pandas`` ``>=1.5.3,<2.2; python_version < "3.9"``
=================== =========================================
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@
"httpx>=0.25.0",
"pandas>=1.5.3,<2.2;python_version<\"3.9\"",
"pandas>=2.1.2,<2.2;python_version>=\"3.9\"",
"weaviate-client>=3.24.2"
"weaviate-client>=4.4.0"
],
"devel-deps": [],
"plugins": [],
Expand Down
Loading