From 744d69c3fc726847a9164602a9ff765a9282aa6c Mon Sep 17 00:00:00 2001 From: Vadim Vladimirov Date: Mon, 15 Jan 2024 16:44:05 +0700 Subject: [PATCH 1/4] refactor: move credentials logic to utils --- airflow/providers/yandex/hooks/yandex.py | 122 +++++-------- airflow/providers/yandex/utils/__init__.py | 16 ++ airflow/providers/yandex/utils/credentials.py | 100 +++++++++++ airflow/providers/yandex/utils/defaults.py | 22 +++ airflow/providers/yandex/utils/fields.py | 42 +++++ airflow/providers/yandex/utils/user_agent.py | 48 +++++ tests/providers/yandex/hooks/test_yandex.py | 152 ++++++++-------- .../yandex/hooks/test_yandexcloud_dataproc.py | 56 +++--- .../operators/test_yandexcloud_dataproc.py | 38 ++-- tests/providers/yandex/utils/__init__.py | 16 ++ .../yandex/utils/test_credentials.py | 168 ++++++++++++++++++ tests/providers/yandex/utils/test_defaults.py | 16 ++ tests/providers/yandex/utils/test_fields.py | 83 +++++++++ .../providers/yandex/utils/test_user_agent.py | 52 ++++++ 14 files changed, 725 insertions(+), 206 deletions(-) create mode 100644 airflow/providers/yandex/utils/__init__.py create mode 100644 airflow/providers/yandex/utils/credentials.py create mode 100644 airflow/providers/yandex/utils/defaults.py create mode 100644 airflow/providers/yandex/utils/fields.py create mode 100644 airflow/providers/yandex/utils/user_agent.py create mode 100644 tests/providers/yandex/utils/__init__.py create mode 100644 tests/providers/yandex/utils/test_credentials.py create mode 100644 tests/providers/yandex/utils/test_defaults.py create mode 100644 tests/providers/yandex/utils/test_fields.py create mode 100644 tests/providers/yandex/utils/test_user_agent.py diff --git a/airflow/providers/yandex/hooks/yandex.py b/airflow/providers/yandex/hooks/yandex.py index 02bf037ae5075..67afc2d59f718 100644 --- a/airflow/providers/yandex/hooks/yandex.py +++ b/airflow/providers/yandex/hooks/yandex.py @@ -16,27 +16,37 @@ # under the License. from __future__ import annotations -import json import warnings from typing import Any import yandexcloud -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook +from airflow.providers.yandex.utils.credentials import ( + get_credentials, + get_service_account_id, +) +from airflow.providers.yandex.utils.defaults import conn_name_attr, conn_type, default_conn_name, hook_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.providers.yandex.utils.user_agent import provider_user_agent class YandexCloudBaseHook(BaseHook): """ A base hook for Yandex.Cloud related tasks. - :param yandex_conn_id: The connection ID to use when fetching connection info. + :param yandex_conn_id: The connection ID to use when fetching connection info + :param connection_id: Deprecated, use yandex_conn_id instead + :param default_folder_id: The folder ID to use instead of connection folder ID + :param default_public_ssh_key: The key to use instead of connection key + :param default_service_account_id: The service account ID to use instead of key service account ID """ - conn_name_attr = "yandex_conn_id" - default_conn_name = "yandexcloud_default" - conn_type = "yandexcloud" - hook_name = "Yandex Cloud" + conn_name_attr = conn_name_attr + default_conn_name = default_conn_name + conn_type = conn_type + hook_name = hook_name @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: @@ -50,14 +60,14 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: lazy_gettext("Service account auth JSON"), widget=BS3PasswordFieldWidget(), description="Service account auth JSON. Looks like " - '{"id", "...", "service_account_id": "...", "private_key": "..."}. ' + '{"id": "...", "service_account_id": "...", "private_key": "..."}. ' "Will be used instead of OAuth token and SA JSON file path field if specified.", ), "service_account_json_path": StringField( lazy_gettext("Service account auth JSON file path"), widget=BS3TextFieldWidget(), description="Service account auth JSON file path. File content looks like " - '{"id", "...", "service_account_id": "...", "private_key": "..."}. ' + '{"id": "...", "service_account_id": "...", "private_key": "..."}. ' "Will be used instead of OAuth token if specified.", ), "oauth": PasswordField( @@ -75,7 +85,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: "public_ssh_key": StringField( lazy_gettext("Public SSH key"), widget=BS3TextFieldWidget(), - description="Optional. This key will be placed to all created Compute nodes" + description="Optional. This key will be placed to all created Compute nodes " "to let you have a root shell there", ), "endpoint": StringField( @@ -87,30 +97,13 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def provider_user_agent(cls) -> str | None: - """Construct User-Agent from Airflow core & provider package versions.""" - from airflow import __version__ as airflow_version - from airflow.configuration import conf - from airflow.providers_manager import ProvidersManager - - try: - manager = ProvidersManager() - provider_name = manager.hooks[cls.conn_type].package_name # type: ignore[union-attr] - provider = manager.providers[provider_name] - return " ".join( - ( - conf.get("yandex", "sdk_user_agent_prefix", fallback=""), - f"apache-airflow/{airflow_version}", - f"{provider_name}/{provider.version}", - ) - ).strip() - except KeyError: - warnings.warn( - f"Hook '{cls.hook_name}' info is not initialized in airflow.ProviderManager", - UserWarning, - stacklevel=2, - ) - - return None + warnings.warn( + "Using `provider_user_agent` in `YandexCloudBaseHook` is deprecated. " + "Please use it in `utils.user_agent` instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + return provider_user_agent() @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: @@ -122,7 +115,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: def __init__( self, - # Connection id is deprecated. Use yandex_conn_id instead + # connection_id is deprecated, use yandex_conn_id instead connection_id: str | None = None, yandex_conn_id: str | None = None, default_folder_id: str | None = None, @@ -137,46 +130,23 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) - self.connection_id = yandex_conn_id or connection_id or self.default_conn_name + self.connection_id = yandex_conn_id or connection_id or default_conn_name self.connection = self.get_connection(self.connection_id) self.extras = self.connection.extra_dejson - credentials = self._get_credentials() + credentials = get_credentials( + oauth_token=self._get_field("oauth"), + service_account_json=self._get_field("service_account_json"), + service_account_json_path=self._get_field("service_account_json_path"), + ) sdk_config = self._get_endpoint() - self.sdk = yandexcloud.SDK(user_agent=self.provider_user_agent(), **sdk_config, **credentials) + self.sdk = yandexcloud.SDK(user_agent=provider_user_agent(), **sdk_config, **credentials) self.default_folder_id = default_folder_id or self._get_field("folder_id") self.default_public_ssh_key = default_public_ssh_key or self._get_field("public_ssh_key") - self.default_service_account_id = default_service_account_id or self._get_service_account_id() - self.client = self.sdk.client - - def _get_service_account_key(self) -> dict[str, str] | None: - service_account_json = self._get_field("service_account_json") - service_account_json_path = self._get_field("service_account_json_path") - if service_account_json_path: - with open(service_account_json_path) as infile: - service_account_json = infile.read() - if service_account_json: - return json.loads(service_account_json) - return None - - def _get_service_account_id(self) -> str | None: - sa_key = self._get_service_account_key() - if sa_key: - return sa_key.get("service_account_id") - return None - - def _get_credentials(self) -> dict[str, Any]: - oauth_token = self._get_field("oauth") - if oauth_token: - return {"token": oauth_token} - - service_account_key = self._get_service_account_key() - if service_account_key: - return {"service_account_key": service_account_key} - - raise AirflowException( - "No credentials are found in connection. Specify either service account " - "authentication JSON or user OAuth token in Yandex.Cloud connection" + self.default_service_account_id = default_service_account_id or get_service_account_id( + service_account_json=self._get_field("service_account_json"), + service_account_json_path=self._get_field("service_account_json_path"), ) + self.client = self.sdk.client def _get_endpoint(self) -> dict[str, str]: sdk_config = {} @@ -186,18 +156,6 @@ def _get_endpoint(self) -> dict[str, str]: return sdk_config def _get_field(self, field_name: str, default: Any = None) -> Any: - """Get field from extra, first checking short name, then for backcompat we check for prefixed name.""" if not hasattr(self, "extras"): return default - backcompat_prefix = "extra__yandexcloud__" - if field_name.startswith("extra__"): - raise ValueError( - f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix " - "when using this method." - ) - if field_name in self.extras: - return self.extras[field_name] - prefixed_name = f"{backcompat_prefix}{field_name}" - if prefixed_name in self.extras: - return self.extras[prefixed_name] - return default + return get_field_from_extras(self.extras, field_name, default) diff --git a/airflow/providers/yandex/utils/__init__.py b/airflow/providers/yandex/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/yandex/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/yandex/utils/credentials.py b/airflow/providers/yandex/utils/credentials.py new file mode 100644 index 0000000000000..f54e8bdfbedfc --- /dev/null +++ b/airflow/providers/yandex/utils/credentials.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import logging +from typing import Any + +log = logging.getLogger(__name__) + + +def get_credentials( + oauth_token: str | None = None, + service_account_json: dict | str | None = None, + service_account_json_path: str | None = None, +) -> dict[str, Any]: + """ + Return credentials JSON for Yandex Cloud SDK based on credentials. + + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Metadata Service + + :param oauth_token: OAuth Token + :param service_account_json: Service Account JSON key or dict + :param service_account_json_path: Service Account JSON key file path + :return: Credentials JSON + """ + if oauth_token: + return {"token": oauth_token} + + service_account_key = get_service_account_key( + service_account_json=service_account_json, + service_account_json_path=service_account_json_path, + ) + if service_account_key: + return {"service_account_key": service_account_key} + + log.info("using metadata service as credentials") + return {} + + +def get_service_account_key( + service_account_json: dict | str | None = None, + service_account_json_path: str | None = None, +) -> dict[str, str] | None: + """ + Return Yandex Cloud Service Account key loaded from JSON string or file. + + :param service_account_json: Service Account JSON key or dict + :param service_account_json_path: Service Account JSON key file path + :return: Yandex Cloud Service Account key + """ + if service_account_json_path: + with open(service_account_json_path) as infile: + service_account_json = infile.read() + + if isinstance(service_account_json, dict): + return service_account_json + if service_account_json: + return json.loads(service_account_json) + + return None + + +def get_service_account_id( + service_account_json: dict | str | None = None, + service_account_json_path: str | None = None, +) -> str | None: + """ + Return Yandex Cloud Service Account ID loaded from JSON string or file. + + :param service_account_json: Service Account JSON key or dict + :param service_account_json_path: Service Account JSON key file path + :return: Yandex Cloud Service Account ID + """ + sa_key = get_service_account_key( + service_account_json=service_account_json, + service_account_json_path=service_account_json_path, + ) + if sa_key: + return sa_key.get("service_account_id") + return None diff --git a/airflow/providers/yandex/utils/defaults.py b/airflow/providers/yandex/utils/defaults.py new file mode 100644 index 0000000000000..9fac3ee845f48 --- /dev/null +++ b/airflow/providers/yandex/utils/defaults.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +conn_name_attr = "yandex_conn_id" +default_conn_name = "yandexcloud_default" +conn_type = "yandexcloud" +hook_name = "Yandex Cloud" diff --git a/airflow/providers/yandex/utils/fields.py b/airflow/providers/yandex/utils/fields.py new file mode 100644 index 0000000000000..27cfc0b3b816d --- /dev/null +++ b/airflow/providers/yandex/utils/fields.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + + +def get_field_from_extras(extras: dict[str, Any], field_name: str, default: Any = None) -> Any: + """ + Get field from extras, first checking short name, then for backcompat checking for prefixed name. + + :param extras: Dictionary with extras keys + :param field_name: Field name to get from extras + :param default: Default value if field not found + :return: Field value or default if not found + """ + backcompat_prefix = "extra__yandexcloud__" + if field_name.startswith("extra__"): + raise ValueError( + f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix " + "when using this function." + ) + if field_name in extras: + return extras[field_name] + prefixed_name = f"{backcompat_prefix}{field_name}" + if prefixed_name in extras: + return extras[prefixed_name] + return default diff --git a/airflow/providers/yandex/utils/user_agent.py b/airflow/providers/yandex/utils/user_agent.py new file mode 100644 index 0000000000000..08bb8e467f112 --- /dev/null +++ b/airflow/providers/yandex/utils/user_agent.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import warnings + +from airflow.providers.yandex.utils.defaults import conn_type, hook_name + + +def provider_user_agent() -> str | None: + """Construct User-Agent from Airflow core & provider package versions.""" + from airflow import __version__ as airflow_version + from airflow.configuration import conf + from airflow.providers_manager import ProvidersManager + + try: + manager = ProvidersManager() + provider_name = manager.hooks[conn_type].package_name # type: ignore[union-attr] + provider = manager.providers[provider_name] + return " ".join( + ( + conf.get("yandex", "sdk_user_agent_prefix", fallback=""), + f"apache-airflow/{airflow_version}", + f"{provider_name}/{provider.version}", + ) + ).strip() + except KeyError: + warnings.warn( + f"Hook '{hook_name}' info is not initialized in airflow.ProviderManager", + UserWarning, + stacklevel=2, + ) + + return None diff --git a/tests/providers/yandex/hooks/test_yandex.py b/tests/providers/yandex/hooks/test_yandex.py index 23b460dabfaba..1ba8c800c1260 100644 --- a/tests/providers/yandex/hooks/test_yandex.py +++ b/tests/providers/yandex/hooks/test_yandex.py @@ -19,32 +19,30 @@ import os from unittest import mock -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest -from airflow.exceptions import AirflowException from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from tests.test_utils.config import conf_vars class TestYandexHook: @mock.patch("airflow.hooks.base.BaseHook.get_connection") - @mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") - def test_client_created_without_exceptions(self, get_credentials_mock, get_connection_mock): + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") + def test_client_created_without_exceptions(self, mock_get_credentials, mock_get_connection): """tests `init` method to validate client creation when all parameters are passed""" - # Inputs to constructor default_folder_id = "test_id" default_public_ssh_key = "test_key" extra_dejson = '{"extras": "extra"}' - get_connection_mock["extra_dejson"] = "sdsd" - get_connection_mock.extra_dejson = '{"extras": "extra"}' - get_connection_mock.return_value = mock.Mock( + mock_get_connection["extra_dejson"] = "sdsd" + mock_get_connection.extra_dejson = '{"extras": "extra"}' + mock_get_connection.return_value = mock.Mock( connection_id="yandexcloud_default", extra_dejson=extra_dejson ) - get_credentials_mock.return_value = {"token": 122323} + mock_get_credentials.return_value = {"token": 122323} hook = YandexCloudBaseHook( yandex_conn_id=None, @@ -54,41 +52,54 @@ def test_client_created_without_exceptions(self, get_credentials_mock, get_conne assert hook.client is not None @mock.patch("airflow.hooks.base.BaseHook.get_connection") - def test_get_credentials_raise_exception(self, get_connection_mock): - """tests 'get_credentials' method raising exception if none of the required fields are passed.""" + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") + def test_provider_user_agent(self, mock_get_credentials, mock_get_connection): + mock_get_connection.return_value = mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}") + mock_get_credentials.return_value = {"token": 122323} + sdk_prefix = "MyAirflow" - # Inputs to constructor - default_folder_id = "test_id" - default_public_ssh_key = "test_key" + with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}): + hook = YandexCloudBaseHook() + assert hook.provider_user_agent().startswith(sdk_prefix) - extra_dejson = '{"extras": "extra"}' - get_connection_mock["extra_dejson"] = "sdsd" - get_connection_mock.extra_dejson = '{"extras": "extra"}' - get_connection_mock.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson - ) + @mock.patch("airflow.hooks.base.BaseHook.get_connection") + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") + def test_sdk_user_agent(self, mock_get_credentials, mock_get_connection): + mock_get_connection.return_value = mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}") + mock_get_credentials.return_value = {"token": 122323} + sdk_prefix = "MyAirflow" + + with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}): + hook = YandexCloudBaseHook() + assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix) - with pytest.raises(AirflowException): - YandexCloudBaseHook( - yandex_conn_id=None, - default_folder_id=default_folder_id, - default_public_ssh_key=default_public_ssh_key, - ) + @pytest.mark.parametrize( + "uri", + [ + pytest.param( + "a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc", id="prefix" + ), + pytest.param("a://?folder_id=abc&public_ssh_key=abc", id="no-prefix"), + ], + ) + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials", new=MagicMock()) + def test_backcompat_prefix_works(self, uri): + with mock.patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}): + hook = YandexCloudBaseHook("my_conn") + assert hook.default_folder_id == "abc" + assert hook.default_public_ssh_key == "abc" @mock.patch("airflow.hooks.base.BaseHook.get_connection") - @mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") - def test_get_field(self, get_credentials_mock, get_connection_mock): - # Inputs to constructor + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") + def test_get_endpoint_specified(self, mock_get_credentials, mock_get_connection): default_folder_id = "test_id" default_public_ssh_key = "test_key" - extra_dejson = {"one": "value_one"} - get_connection_mock["extra_dejson"] = "sdsd" - get_connection_mock.extra_dejson = '{"extras": "extra"}' - get_connection_mock.return_value = mock.Mock( + extra_dejson = {"endpoint": "my_endpoint", "something_else": "some_value"} + mock_get_connection.return_value = mock.Mock( connection_id="yandexcloud_default", extra_dejson=extra_dejson ) - get_credentials_mock.return_value = {"token": 122323} + mock_get_credentials.return_value = {"token": 122323} hook = YandexCloudBaseHook( yandex_conn_id=None, @@ -96,20 +107,19 @@ def test_get_field(self, get_credentials_mock, get_connection_mock): default_public_ssh_key=default_public_ssh_key, ) - assert hook._get_field("one") == "value_one" + assert hook._get_endpoint() == {"endpoint": "my_endpoint"} @mock.patch("airflow.hooks.base.BaseHook.get_connection") - @mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") - def test_get_endpoint_specified(self, get_credentials_mock, get_connection_mock): - # Inputs to constructor + @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") + def test_get_endpoint_unspecified(self, mock_get_credentials, mock_get_connection): default_folder_id = "test_id" default_public_ssh_key = "test_key" - extra_dejson = {"endpoint": "my_endpoint", "something_else": "some_value"} - get_connection_mock.return_value = mock.Mock( + extra_dejson = {"something_else": "some_value"} + mock_get_connection.return_value = mock.Mock( connection_id="yandexcloud_default", extra_dejson=extra_dejson ) - get_credentials_mock.return_value = {"token": 122323} + mock_get_credentials.return_value = {"token": 122323} hook = YandexCloudBaseHook( yandex_conn_id=None, @@ -117,52 +127,50 @@ def test_get_endpoint_specified(self, get_credentials_mock, get_connection_mock) default_public_ssh_key=default_public_ssh_key, ) - assert hook._get_endpoint() == {"endpoint": "my_endpoint"} + assert hook._get_endpoint() == {} @mock.patch("airflow.hooks.base.BaseHook.get_connection") - @mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") - def test_get_endpoint_unspecified(self, get_credentials_mock, get_connection_mock): - # Inputs to constructor + def test__get_field(self, mock_get_connection): + field_name = "one" + field_value = "value_one" default_folder_id = "test_id" default_public_ssh_key = "test_key" + extra_dejson = {field_name: field_value} - extra_dejson = {"something_else": "some_value"} - get_connection_mock.return_value = mock.Mock( + mock_get_connection["extra_dejson"] = "sdsd" + mock_get_connection.extra_dejson = '{"extras": "extra"}' + mock_get_connection.return_value = mock.Mock( connection_id="yandexcloud_default", extra_dejson=extra_dejson ) - get_credentials_mock.return_value = {"token": 122323} hook = YandexCloudBaseHook( yandex_conn_id=None, default_folder_id=default_folder_id, default_public_ssh_key=default_public_ssh_key, ) + res = hook._get_field( + field_name=field_name, + ) - assert hook._get_endpoint() == {} + assert res == field_value @mock.patch("airflow.hooks.base.BaseHook.get_connection") - @mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") - def test_sdk_user_agent(self, get_credentials_mock, get_connection_mock): - get_connection_mock.return_value = mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}") - get_credentials_mock.return_value = {"token": 122323} - sdk_prefix = "MyAirflow" + def test__get_field_extras_not_found(self, get_connection_mock): + field_name = "some_field" + default = "some_default" + extra_dejson = '{"extras": "extra"}' - with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}): - hook = YandexCloudBaseHook() - assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix) + get_connection_mock["extra_dejson"] = "sdsd" + get_connection_mock.extra_dejson = '{"extras": "extra"}' + get_connection_mock.return_value = mock.Mock( + connection_id="yandexcloud_default", extra_dejson=extra_dejson + ) - @pytest.mark.parametrize( - "uri", - [ - pytest.param( - "a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc", id="prefix" - ), - pytest.param("a://?folder_id=abc&public_ssh_key=abc", id="no-prefix"), - ], - ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials", new=MagicMock()) - def test_backcompat_prefix_works(self, uri): - with patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}): - hook = YandexCloudBaseHook("my_conn") - assert hook.default_folder_id == "abc" - assert hook.default_public_ssh_key == "abc" + hook = YandexCloudBaseHook() + delattr(hook, "extras") + res = hook._get_field( + field_name=field_name, + default=default, + ) + + assert res == default diff --git a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py index cf436a9b8c02d..71b94dcd57466 100644 --- a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py +++ b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py @@ -17,19 +17,10 @@ from __future__ import annotations import json -from unittest.mock import patch - -import pytest +from unittest import mock from airflow.models import Connection - -try: - import yandexcloud - - from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook -except ImportError: - yandexcloud = None - +from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook # Airflow connection with type "yandexcloud" must be created CONNECTION_ID = "yandexcloud_default" @@ -61,23 +52,22 @@ "cFDe6faKCxH6iDRteo4D8L8BxwzN42uZSB0nfmjkIxFTcEU3mFSXEbWByg78aoddMrAAjatyrhH1pON6P0=" ] -# If Yandex.Cloud credentials are set than full test will be run. Otherwise only mocked tests. +# If Yandex.Cloud credentials are set than full test will be run. Otherwise, only mocked tests. HAS_CREDENTIALS = OAUTH_TOKEN != "my_oauth_token" -@pytest.mark.skipif(yandexcloud is None, reason="Skipping Yandex.Cloud hook test: no yandexcloud module") class TestYandexCloudDataprocHook: def _init_hook(self): - with patch("airflow.hooks.base.BaseHook.get_connection") as get_connection_mock: - get_connection_mock.return_value = self.connection + with mock.patch("airflow.hooks.base.BaseHook.get_connection") as mock_get_connection: + mock_get_connection.return_value = self.connection self.hook = DataprocHook() def setup_method(self): self.connection = Connection(extra=json.dumps({"oauth": OAUTH_TOKEN})) self._init_hook() - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_create_dataproc_cluster_mocked(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_create_dataproc_cluster_mocked(self, mock_create_operation): self._init_hook() self.hook.client.create_cluster( @@ -90,16 +80,16 @@ def test_create_dataproc_cluster_mocked(self, create_operation_mock): cluster_image_version=CLUSTER_IMAGE_VERSION, service_account_id=SERVICE_ACCOUNT_ID, ) - assert create_operation_mock.called + assert mock_create_operation.called - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_delete_dataproc_cluster_mocked(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_delete_dataproc_cluster_mocked(self, mock_create_operation): self._init_hook() self.hook.client.delete_cluster("my_cluster_id") - assert create_operation_mock.called + assert mock_create_operation.called - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_create_hive_job_hook(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_create_hive_job_hook(self, mock_create_operation): self._init_hook() self.hook.client.create_hive_job( @@ -110,10 +100,10 @@ def test_create_hive_job_hook(self, create_operation_mock): query="SELECT 1;", script_variables=None, ) - assert create_operation_mock.called + assert mock_create_operation.called - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_create_mapreduce_job_hook(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_create_mapreduce_job_hook(self, mock_create_operation): self._init_hook() self.hook.client.create_mapreduce_job( @@ -145,10 +135,10 @@ def test_create_mapreduce_job_hook(self, create_operation_mock): "mapreduce.job.maps": "6", }, ) - assert create_operation_mock.called + assert mock_create_operation.called - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_create_spark_job_hook(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_create_spark_job_hook(self, mock_create_operation): self._init_hook() self.hook.client.create_spark_job( @@ -170,10 +160,10 @@ def test_create_spark_job_hook(self, create_operation_mock): name="Spark job", properties={"spark.submit.deployMode": "cluster"}, ) - assert create_operation_mock.called + assert mock_create_operation.called - @patch("yandexcloud.SDK.create_operation_and_get_result") - def test_create_pyspark_job_hook(self, create_operation_mock): + @mock.patch("yandexcloud.SDK.create_operation_and_get_result") + def test_create_pyspark_job_hook(self, mock_create_operation): self._init_hook() self.hook.client.create_pyspark_job( @@ -194,4 +184,4 @@ def test_create_pyspark_job_hook(self, create_operation_mock): properties={"spark.submit.deployMode": "cluster"}, python_file_uris=["s3a://some-in-bucket/jobs/sources/pyspark-001/geonames.py"], ) - assert create_operation_mock.called + assert mock_create_operation.called diff --git a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py index 879645daf60cb..083e3d538ac04 100644 --- a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py +++ b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py @@ -76,10 +76,10 @@ def setup_method(self): schedule="@daily", ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.create_cluster") - def test_create_cluster(self, create_cluster_mock, *_): + def test_create_cluster(self, mock_create_cluster, *_): operator = DataprocCreateClusterOperator( task_id="create_cluster", ssh_public_keys=SSH_PUBLIC_KEYS, @@ -93,7 +93,7 @@ def test_create_cluster(self, create_cluster_mock, *_): ) context = {"task_instance": MagicMock()} operator.execute(context) - create_cluster_mock.assert_called_once_with( + mock_create_cluster.assert_called_once_with( cluster_description="", cluster_image_version="1.4", cluster_name=None, @@ -135,15 +135,15 @@ def test_create_cluster(self, create_cluster_mock, *_): ) context["task_instance"].xcom_push.assert_has_calls( [ - call(key="cluster_id", value=create_cluster_mock().response.id), + call(key="cluster_id", value=mock_create_cluster().response.id), call(key="yandexcloud_connection_id", value=CONNECTION_ID), ] ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.delete_cluster") - def test_delete_cluster_operator(self, delete_cluster_mock, *_): + def test_delete_cluster_operator(self, mock_delete_cluster, *_): operator = DataprocDeleteClusterOperator( task_id="delete_cluster", connection_id=CONNECTION_ID, @@ -152,12 +152,12 @@ def test_delete_cluster_operator(self, delete_cluster_mock, *_): context["task_instance"].xcom_pull.return_value = "my_cluster_id" operator.execute(context) context["task_instance"].xcom_pull.assert_called_once_with(key="cluster_id") - delete_cluster_mock.assert_called_once_with("my_cluster_id") + mock_delete_cluster.assert_called_once_with("my_cluster_id") - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.create_hive_job") - def test_create_hive_job_operator(self, create_hive_job_mock, *_): + def test_create_hive_job_operator(self, mock_create_hive_job, *_): operator = DataprocCreateHiveJobOperator( task_id="create_hive_job", query="SELECT 1;", @@ -173,7 +173,7 @@ def test_create_hive_job_operator(self, create_hive_job_mock, *_): ] ) - create_hive_job_mock.assert_called_once_with( + mock_create_hive_job.assert_called_once_with( cluster_id="my_cluster_id", continue_on_failure=False, name="Hive job", @@ -183,10 +183,10 @@ def test_create_hive_job_operator(self, create_hive_job_mock, *_): script_variables=None, ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.create_mapreduce_job") - def test_create_mapreduce_job_operator(self, create_mapreduce_job_mock, *_): + def test_create_mapreduce_job_operator(self, mock_create_mapreduce_job, *_): operator = DataprocCreateMapReduceJobOperator( task_id="run_mapreduce_job", main_class="org.apache.hadoop.streaming.HadoopStreaming", @@ -223,7 +223,7 @@ def test_create_mapreduce_job_operator(self, create_mapreduce_job_mock, *_): ] ) - create_mapreduce_job_mock.assert_called_once_with( + mock_create_mapreduce_job.assert_called_once_with( archive_uris=None, args=[ "-mapper", @@ -253,10 +253,10 @@ def test_create_mapreduce_job_operator(self, create_mapreduce_job_mock, *_): }, ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.create_spark_job") - def test_create_spark_job_operator(self, create_spark_job_mock, *_): + def test_create_spark_job_operator(self, mock_create_spark_job, *_): operator = DataprocCreateSparkJobOperator( task_id="create_spark_job", main_jar_file_uri="s3a://data-proc-public/jobs/sources/java/dataproc-examples-1.0.jar", @@ -292,7 +292,7 @@ def test_create_spark_job_operator(self, create_spark_job_mock, *_): ] ) - create_spark_job_mock.assert_called_once_with( + mock_create_spark_job.assert_called_once_with( archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"], args=[ "s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2", @@ -315,10 +315,10 @@ def test_create_spark_job_operator(self, create_spark_job_mock, *_): exclude_packages=None, ) - @patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials") + @patch("airflow.providers.yandex.utils.credentials.get_credentials") @patch("airflow.hooks.base.BaseHook.get_connection") @patch("yandexcloud._wrappers.dataproc.Dataproc.create_pyspark_job") - def test_create_pyspark_job_operator(self, create_pyspark_job_mock, *_): + def test_create_pyspark_job_operator(self, mock_create_pyspark_job, *_): operator = DataprocCreatePysparkJobOperator( task_id="create_pyspark_job", main_python_file_uri="s3a://some-in-bucket/jobs/sources/pyspark-001/main.py", @@ -355,7 +355,7 @@ def test_create_pyspark_job_operator(self, create_pyspark_job_mock, *_): ] ) - create_pyspark_job_mock.assert_called_once_with( + mock_create_pyspark_job.assert_called_once_with( archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"], args=[ "s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2", diff --git a/tests/providers/yandex/utils/__init__.py b/tests/providers/yandex/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/yandex/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/yandex/utils/test_credentials.py b/tests/providers/yandex/utils/test_credentials.py new file mode 100644 index 0000000000000..5bc1e174903b0 --- /dev/null +++ b/tests/providers/yandex/utils/test_credentials.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest import mock + +from airflow.providers.yandex.utils.credentials import ( + get_credentials, + get_service_account_id, + get_service_account_key, +) + + +def test_get_credentials_oauth_token(): + oauth_token = "y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc" + service_account_key = { + "id": "...", + "service_account_id": "...", + "private_key": "...", + } + service_account_key_json = json.dumps(service_account_key) + service_account_file_path = "/home/airflow/authorized_key.json" + expected = {"token": oauth_token} + + res = get_credentials( + oauth_token=oauth_token, + service_account_json=service_account_key_json, + service_account_json_path=service_account_file_path, + ) + + assert res == expected + + +@mock.patch("airflow.providers.yandex.utils.credentials.get_service_account_key") +def test_get_credentials_service_account_key(mock_get_service_account_key): + service_account_key = { + "id": "...", + "service_account_id": "...", + "private_key": "...", + } + service_account_key_json = json.dumps(service_account_key) + service_account_file_path = "/home/airflow/authorized_key.json" + expected = {"service_account_key": service_account_key} + + mock_get_service_account_key.return_value = service_account_key + + res = get_credentials( + service_account_json=service_account_key_json, + service_account_json_path=service_account_file_path, + ) + + assert res == expected + + +def test_get_credentials_metadata_service(caplog): + expected = {} + + res = get_credentials() + + assert res == expected + assert "using metadata service as credentials" in caplog.text + + +def test_get_service_account_key(): + service_account_key = { + "id": "...", + "service_account_id": "...", + "private_key": "...", + } + service_account_key_json = json.dumps(service_account_key) + expected = service_account_key + + res = get_service_account_key( + service_account_json=service_account_key_json, + ) + + assert res == expected + + +def test_get_service_account_dict(): + service_account_key = { + "id": "...", + "service_account_id": "...", + "private_key": "...", + } + expected = service_account_key + + res = get_service_account_key( + service_account_json=service_account_key, + ) + + assert res == expected + + +def test_get_service_account_key_file(tmp_path): + service_account_key = { + "id": "...", + "service_account_id": "...", + "private_key": "...", + } + service_account_key_json = json.dumps(service_account_key) + service_account_file = tmp_path / "authorized_key.json" + service_account_file.write_text(service_account_key_json) + service_account_file_path = str(service_account_file) + expected = service_account_key + + res = get_service_account_key( + service_account_json=service_account_key_json, + service_account_json_path=service_account_file_path, + ) + + assert res == expected + + +def test_get_service_account_key_none(): + expected = None + + res = get_service_account_key() + + assert res == expected + + +@mock.patch("airflow.providers.yandex.utils.credentials.get_service_account_key") +def test_get_service_account_id(mock_get_service_account_key): + service_account_id = "this_is_service_account_id" + service_account_key = { + "id": "...", + "service_account_id": service_account_id, + "private_key": "...", + } + service_account_key_json = json.dumps(service_account_key) + service_account_file_path = "/home/airflow/authorized_key.json" + expected = service_account_id + + mock_get_service_account_key.return_value = service_account_key + + res = get_service_account_id( + service_account_json=service_account_key_json, + service_account_json_path=service_account_file_path, + ) + + assert res == expected + + +@mock.patch("airflow.providers.yandex.utils.credentials.get_service_account_key") +def test_get_service_account_id_none(mock_get_service_account_key): + expected = None + + mock_get_service_account_key.return_value = None + + res = get_service_account_id() + + assert res == expected diff --git a/tests/providers/yandex/utils/test_defaults.py b/tests/providers/yandex/utils/test_defaults.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/yandex/utils/test_defaults.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/yandex/utils/test_fields.py b/tests/providers/yandex/utils/test_fields.py new file mode 100644 index 0000000000000..3b422823242a5 --- /dev/null +++ b/tests/providers/yandex/utils/test_fields.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.providers.yandex.utils.fields import get_field_from_extras + + +def test_get_field_from_extras(): + field_name = "somefield" + default = None + expected = "somevalue" + extras = { + field_name: expected, + } + + res = get_field_from_extras( + extras=extras, + field_name=field_name, + default=default, + ) + + assert res == expected + + +def test_get_field_from_extras_not_found(): + field_name = "somefield" + default = "default" + expected = default + extras = {} + + res = get_field_from_extras( + extras=extras, + field_name=field_name, + default=default, + ) + + assert res == expected + + +def test_get_field_from_extras_prefixed_in_extra(): + field_name = "somefield" + default = None + expected = "somevalue" + extras = { + f"extra__yandexcloud__{field_name}": expected, + } + + res = get_field_from_extras( + extras=extras, + field_name=field_name, + default=default, + ) + + assert res == expected + + +def test_get_field_from_extras_field_name_with_extra_raise_exception(): + field_name = "extra__yandexcloud__fieldname" + default = None + extras = {} + + with pytest.raises(ValueError): + get_field_from_extras( + extras=extras, + field_name=field_name, + default=default, + ) diff --git a/tests/providers/yandex/utils/test_user_agent.py b/tests/providers/yandex/utils/test_user_agent.py new file mode 100644 index 0000000000000..854549e391d02 --- /dev/null +++ b/tests/providers/yandex/utils/test_user_agent.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.yandex.utils.user_agent import provider_user_agent + + +def test_provider_user_agent(): + user_agent = provider_user_agent() + + from airflow import __version__ as airflow_version + + user_agent_airflow = f"apache-airflow/{airflow_version}" + assert user_agent_airflow in user_agent + + from airflow.providers_manager import ProvidersManager + + manager = ProvidersManager() + provider_name = manager.hooks["yandexcloud"].package_name + provider = manager.providers[provider_name] + user_agent_provider = f"{provider_name}/{provider.version}" + assert user_agent_provider in user_agent + + from airflow.configuration import conf + + user_agent_prefix = conf.get("yandex", "sdk_user_agent_prefix", fallback="") + assert user_agent_prefix in user_agent + + +@mock.patch("airflow.providers_manager.ProvidersManager.hooks") +def test_provider_user_agent_hook_not_exists(mock_hooks): + mock_hooks.return_value = [] + + user_agent = provider_user_agent() + + assert user_agent is None From 21a61aa4f38893dcc7d1d98de6cc09bbfd8c8923 Mon Sep 17 00:00:00 2001 From: Vadim Vladimirov Date: Wed, 10 Jan 2024 22:40:38 +0700 Subject: [PATCH 2/4] docs: using metadata service in Yandex.Cloud Connection --- .../connections/yandexcloud.rst | 67 ++++++++++++++----- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst index 4de36ae04348a..c67676d82865d 100644 --- a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst +++ b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst @@ -15,39 +15,31 @@ specific language governing permissions and limitations under the License. +.. _yandex_cloud_connection: Yandex.Cloud Connection -================================ +======================= The Yandex.Cloud connection type enables the authentication in Yandex.Cloud services. -Authenticating to Yandex.Cloud ---------------------------------- - -Normally service account keys are used for Yandex.Cloud API authentication. -https://cloud.yandex.com/docs/cli/operations/authentication/service-account - -As an alternative to service account key, user OAuth token can be used for authentication. -See the https://cloud.yandex.com/docs/cli/quickstart for obtaining a user OAuth token. - -Default Connection IDs ----------------------- - -All hooks and operators related to Yandex.Cloud use ``yandexcloud_default`` connection by default. - Configuring the Connection -------------------------- Service account auth JSON - JSON object as a string like:: - {"id", "...", "service_account_id": "...", "private_key": "..."} + JSON object as a string. + + Example: ``{"id": "...", "service_account_id": "...", "private_key": "..."}`` Service account auth JSON file path Path to the file containing service account auth JSON. + Example: ``/home/airflow/authorized_key.json`` + OAuth Token OAuth token as a string. + Example: ``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc`` + SSH public key (optional) The key will be placed to all created Compute nodes, allowing to have a root shell there. @@ -55,8 +47,49 @@ Folder ID (optional) Folder is a entity to separate different projects within the cloud. If specified, this ID will be used by default during creation of nodes and clusters. + See https://cloud.yandex.com/docs/resource-manager/operations/folder/get-id for details Endpoint (optional) Set API endpoint + See https://github.com/yandex-cloud/python-sdk for default + +Default Connection IDs +---------------------- + +All hooks and operators related to Yandex.Cloud use ``yandexcloud_default`` connection by default. + +Authenticating to Yandex.Cloud +------------------------------ + +Using Authorized keys for authorization as service account +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Before you start, make sure you have `created `__ +a Yandex Cloud `Service Account `__ +with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``. + +First, you need to create `Authorized key `__ +for your service account and save the generated JSON file with public and private key parts. + +Then you need to specify the key in the ``Service account auth JSON`` field. + +Alternatively, you can specify the path to JSON file in the ``Service account auth JSON file path`` field. + +Using OAuth token for authorization as users account +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +First, you need to create `OAuth token `__ for user account. +It will looks like ``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``. + +Then you need to specify token in the ``OAuth Token`` field. + +Using metadata service +~~~~~~~~~~~~~~~~~~~~~~ + +If no credentials are specified, the connection will attempt to use +the `metadata service `__ for authentication. + +To do this, you need to `link `__ +your service account with your VM. From 7c88d2689520deba26fef6f1f4e9dea2f477f5d7 Mon Sep 17 00:00:00 2001 From: Vadim Vladimirov Date: Mon, 15 Jan 2024 16:47:11 +0700 Subject: [PATCH 3/4] feat: Yandex Cloud Lockbox Secret Backend --- airflow/providers/yandex/secrets/__init__.py | 16 + airflow/providers/yandex/secrets/lockbox.py | 280 +++++++++++ tests/providers/yandex/secrets/__init__.py | 16 + .../providers/yandex/secrets/test_lockbox.py | 435 ++++++++++++++++++ 4 files changed, 747 insertions(+) create mode 100644 airflow/providers/yandex/secrets/__init__.py create mode 100644 airflow/providers/yandex/secrets/lockbox.py create mode 100644 tests/providers/yandex/secrets/__init__.py create mode 100644 tests/providers/yandex/secrets/test_lockbox.py diff --git a/airflow/providers/yandex/secrets/__init__.py b/airflow/providers/yandex/secrets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/yandex/secrets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/yandex/secrets/lockbox.py b/airflow/providers/yandex/secrets/lockbox.py new file mode 100644 index 0000000000000..adbf994873208 --- /dev/null +++ b/airflow/providers/yandex/secrets/lockbox.py @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.providers.yandex.utils.user_agent import provider_user_agent +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class LockboxSecretBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + backend_kwargs = {"yc_connection_id": "", "folder_id": ""} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + :param endpoint: Specifies an API endpoint. + Leave blank to use default. + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: dict | str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + endpoint: str | None = None, + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = None + if not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]): + self.yc_connection_id = yc_connection_id or default_conn_name + else: + assert ( + yc_connection_id is None + ), "yc_connection_id should not be used if other credentials are specified" + + self.folder_id = folder_id + self.connections_prefix = connections_prefix.rstrip(sep) if connections_prefix is not None else None + self.variables_prefix = variables_prefix.rstrip(sep) if variables_prefix is not None else None + self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not None else None + self.sep = sep + self.endpoint = endpoint + + def get_conn_value(self, conn_id: str) -> str | None: + """ + Retrieve from Secrets Backend a string value representing the Connection object. + + :param conn_id: Connection ID + :return: Connection Value + """ + if self.connections_prefix is None: + return None + + if conn_id == self.yc_connection_id: + return None + + return self._get_secret_value(self.connections_prefix, conn_id) + + def get_variable(self, key: str) -> str | None: + """ + Return value for Airflow Variable. + + :param key: Variable Key + :return: Variable Value + """ + if self.variables_prefix is None: + return None + + return self._get_secret_value(self.variables_prefix, key) + + def get_config(self, key: str) -> str | None: + """ + Return value for Airflow Config Key. + + :param key: Config Key + :return: Config Value + """ + if self.config_prefix is None: + return None + + return self._get_secret_value(self.config_prefix, key) + + @cached_property + def _client(self): + """ + Create a Yandex Cloud SDK client. + + Lazy loading is used here + because we can't establish a Connection until all secrets backends have been initialized. + """ + if self.yc_connection_id: + self.yc_oauth_token = self._get_field("oauth") + self.yc_sa_key_json = self._get_field("service_account_json") + self.yc_sa_key_json_path = self._get_field("service_account_json_path") + self.folder_id = self.folder_id or self._get_field("folder_id") + + credentials = get_credentials( + oauth_token=self.yc_oauth_token, + service_account_json=self.yc_sa_key_json, + service_account_json_path=self.yc_sa_key_json_path, + ) + sdk_config = self._get_endpoint() + return yandexcloud.SDK(user_agent=provider_user_agent(), **credentials, **sdk_config).client + + def _get_endpoint(self) -> dict[str, str]: + sdk_config = {} + + if self.endpoint: + sdk_config["endpoint"] = self.endpoint + + return sdk_config + + @cached_property + def _connection(self) -> Connection | None: + if not self.yc_connection_id: + return None + + conn = Connection.get_connection_from_secrets(self.yc_connection_id) + self.log.info("Using connection ID '%s' for task execution.", conn.conn_id) + + return conn + + def _get_field(self, field_name: str, default: Any = None) -> Any: + conn = self._connection + if not conn: + return None + + return get_field_from_extras( + extras=conn.extra_dejson, + field_name=field_name, + default=default, + ) + + def _build_secret_name(self, prefix: str, key: str): + if len(prefix) == 0: + return key + return f"{prefix}{self.sep}{key}" + + def _get_secret_value(self, prefix: str, key: str) -> str | None: + secret: secret_pb.Secret = None + for s in self._get_secrets(): + if s.name == self._build_secret_name(prefix=prefix, key=key): + secret = s + break + if not secret: + return None + + payload = self._get_payload(secret.id, secret.current_version.id) + entries = {entry.key: entry.text_value for entry in payload.entries if entry.text_value} + + if len(entries) == 0: + return None + return sorted(entries.values())[0] + + def _get_secrets(self) -> list[secret_pb.Secret]: + response = self._list_secrets(folder_id=self.folder_id) + + secrets: list[secret_pb.Secret] = response.secrets[:] + next_page_token = response.next_page_token + while next_page_token != "": + response = self._list_secrets( + folder_id=self.folder_id, + page_token=next_page_token, + ) + secrets.extend(response.secrets) + next_page_token = response.next_page_token + + return secrets + + def _get_payload(self, secret_id: str, version_id: str) -> payload_pb.Payload: + request = payload_service_pb.GetPayloadRequest( + secret_id=secret_id, + version_id=version_id, + ) + return self._client(payload_service_pb_grpc.PayloadServiceStub).Get(request) + + def _list_secrets(self, folder_id: str, page_token: str = "") -> secret_service_pb.ListSecretsResponse: + request = secret_service_pb.ListSecretsRequest( + folder_id=folder_id, + page_token=page_token, + ) + return self._client(secret_service_pb_grpc.SecretServiceStub).List(request) diff --git a/tests/providers/yandex/secrets/__init__.py b/tests/providers/yandex/secrets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/yandex/secrets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/yandex/secrets/test_lockbox.py b/tests/providers/yandex/secrets/test_lockbox.py new file mode 100644 index 0000000000000..e51724f866c61 --- /dev/null +++ b/tests/providers/yandex/secrets/test_lockbox.py @@ -0,0 +1,435 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest.mock import MagicMock, Mock, patch + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb + +from airflow.providers.yandex.secrets.lockbox import LockboxSecretBackend +from airflow.providers.yandex.utils.defaults import default_conn_name + + +class TestLockboxSecretBackend: + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection(self, mock_get_value): + conn_id = "fake_conn" + conn_type = "scheme" + host = "host" + login = "user" + password = "pass" + port = 100 + uri = f"{conn_type}://{login}:{password}@{host}:{port}" + + mock_get_value.return_value = uri + + conn = LockboxSecretBackend().get_connection(conn_id) + + assert conn.conn_id == conn_id + assert conn.conn_type == conn_type + assert conn.host == host + assert conn.schema == "" + assert conn.login == login + assert conn.password == password + assert conn.port == port + assert conn.get_uri() == uri + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection_from_json(self, mock_get_value): + conn_id = "airflow_to_yandexcloud" + conn_type = "yandex_cloud" + extra = "some extra values" + c = { + "conn_type": conn_type, + "extra": extra, + } + + mock_get_value.return_value = json.dumps(c) + + conn = LockboxSecretBackend().get_connection(conn_id) + + assert conn.conn_id == conn_id + assert conn.conn_type == conn_type + assert conn.extra == extra + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_variable(self, mock_get_value): + k = "thisiskey" + v = "thisisvalue" + + mock_get_value.return_value = v + + value = LockboxSecretBackend().get_variable(k) + + assert value == v + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_config(self, mock_get_value): + k = "thisiskey" + v = "thisisvalue" + + mock_get_value.return_value = v + + value = LockboxSecretBackend().get_config(k) + + assert value == v + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection_prefix_is_none(self, mock_get_value): + uri = "scheme://user:pass@host:100" + + mock_get_value.return_value = uri + + conn = LockboxSecretBackend( + connections_prefix=None, + ).get_connection("fake_conn") + + assert conn is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection_with_oauth_token_auth(self, mock_get_value): + conn_id = "yandex_cloud" + uri = "scheme://user:pass@host:100" + + mock_get_value.return_value = uri + + conn = LockboxSecretBackend( + yc_oauth_token="y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc", + ).get_connection(conn_id) + + assert conn.conn_id == conn_id + assert conn.get_uri() == uri + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection_conn_id_for_backend(self, mock_get_value): + conn_id = "yandex_cloud" + uri = "scheme://user:pass@host:100" + + mock_get_value.return_value = uri + + conn = LockboxSecretBackend( + yc_connection_id=conn_id, + ).get_connection(conn_id) + + assert conn is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_connection_default_conn_id(self, mock_get_value): + conn_id = default_conn_name + uri = "scheme://user:pass@host:100" + + mock_get_value.return_value = uri + + conn = LockboxSecretBackend().get_connection(conn_id) + + assert conn is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_variable_prefix_is_none(self, mock_get_value): + k = "thisiskey" + v = "thisisvalue" + + mock_get_value.return_value = v + + value = LockboxSecretBackend( + variables_prefix=None, + ).get_variable(k) + + assert value is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value") + def test_yandex_lockbox_secret_backend_get_config_prefix_is_none(self, mock_get_value): + k = "thisiskey" + v = "thisisvalue" + + mock_get_value.return_value = v + + value = LockboxSecretBackend( + config_prefix=None, + ).get_config(k) + + assert value is None + + def test_yandex_lockbox_secret_backend__client_created_without_exceptions(self): + yc_oauth_token = "y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc" + + sm = LockboxSecretBackend( + yc_oauth_token=yc_oauth_token, + ) + + assert sm._client is not None + + def test_yandex_lockbox_secret_backedn__get_endpoint(self): + endpoint = "api.cloud.yandex.net" + expected = { + "endpoint": endpoint, + } + + res = LockboxSecretBackend( + endpoint=endpoint, + )._get_endpoint() + + assert res == expected + + def test_yandex_lockbox_secret_backedn__get_endpoint_not_specified(self): + expected = {} + + res = LockboxSecretBackend()._get_endpoint() + + assert res == expected + + def test_yandex_lockbox_secret_backend__build_secret_name(self): + prefix = "thiisprefix" + key = "thisiskey" + expected = "thiisprefix/thisiskey" + + res = LockboxSecretBackend()._build_secret_name(prefix, key) + + assert res == expected + + def test_yandex_lockbox_secret_backend__build_secret_name_no_prefix(self): + prefix = "" + key = "thisiskey" + expected = "thisiskey" + + res = LockboxSecretBackend()._build_secret_name(prefix, key) + + assert res == expected + + def test_yandex_lockbox_secret_backend__build_secret_name_custom_sep(self): + sep = "_" + prefix = "thiisprefix" + key = "thisiskey" + expected = "thiisprefix_thisiskey" + + res = LockboxSecretBackend( + sep=sep, + )._build_secret_name(prefix, key) + + assert res == expected + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets") + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload") + def test_yandex_lockbox_secret_backend__get_secret_value(self, mock_get_payload, mock_get_secrets): + target_name = "target_name" + target_text = "target_text" + + mock_get_secrets.return_value = [ + secret_pb.Secret( + id="123", + name="one", + ), + secret_pb.Secret( + id="456", + name=target_name, + ), + secret_pb.Secret( + id="789", + name="two", + ), + ] + mock_get_payload.return_value = payload_pb.Payload( + entries=[ + payload_pb.Payload.Entry(text_value=target_text), + ], + ) + + res = LockboxSecretBackend()._get_secret_value("", target_name) + + assert res == target_text + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets") + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload") + def test_yandex_lockbox_secret_backend__get_secret_value_not_found( + self, mock_get_payload, mock_get_secrets + ): + target_name = "target_name" + target_text = "target_text" + + mock_get_secrets.return_value = [ + secret_pb.Secret( + id="123", + name="one", + ), + secret_pb.Secret( + id="789", + name="two", + ), + ] + mock_get_payload.return_value = payload_pb.Payload( + entries=[ + payload_pb.Payload.Entry(text_value=target_text), + ], + ) + + res = LockboxSecretBackend()._get_secret_value("", target_name) + + assert res is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets") + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload") + def test_yandex_lockbox_secret_backend__get_secret_value_no_text_entries( + self, mock_get_payload, mock_get_secrets + ): + target_name = "target_name" + target_value = b"01010101" + + mock_get_secrets.return_value = [ + secret_pb.Secret( + id="123", + name="one", + ), + secret_pb.Secret( + id="456", + name="two", + ), + secret_pb.Secret( + id="789", + name=target_name, + ), + ] + mock_get_payload.return_value = payload_pb.Payload( + entries=[ + payload_pb.Payload.Entry(binary_value=target_value), + ], + ) + + res = LockboxSecretBackend()._get_secret_value("", target_name) + + assert res is None + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets") + def test_yandex_lockbox_secret_backend__get_secrets(self, mock_list_secrets): + secrets = secret_service_pb.ListSecretsResponse( + secrets=[ + secret_pb.Secret( + id="123", + ), + secret_pb.Secret( + id="456", + ), + ], + ) + + mock_list_secrets.return_value = secrets + + res = LockboxSecretBackend( + folder_id="someid", + )._get_secrets() + + assert res == secrets.secrets + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets") + def test_yandex_lockbox_secret_backend__get_secrets_page_token(self, mock_list_secrets): + first_secrets = secret_service_pb.ListSecretsResponse( + secrets=[ + secret_pb.Secret( + id="123", + ), + secret_pb.Secret( + id="456", + ), + ], + next_page_token="token", + ) + second_secrets = secret_service_pb.ListSecretsResponse( + secrets=[ + secret_pb.Secret( + id="789", + ), + secret_pb.Secret( + id="000", + ), + ], + next_page_token="", + ) + + mock_list_secrets.side_effect = [ + first_secrets, + second_secrets, + ] + + res = LockboxSecretBackend( + folder_id="someid", + )._get_secrets() + + assert res == [*first_secrets.secrets, *second_secrets.secrets] + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client") + def test_yandex_lockbox_secret_backend__get_payload(self, mock_client): + mock_stub = MagicMock() + mock_response = payload_pb.Payload() + mock_stub.Get.return_value = mock_response + mock_client.return_value = mock_stub + + result = LockboxSecretBackend()._get_payload( + secret_id="test_secret", + version_id="test_version", + ) + + mock_client.assert_called_once() + mock_stub.Get.assert_called_once() + assert result == mock_response + + @patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client") + def test_yandex_lockbox_secret_backend__list_secrets(self, mock_client): + mock_stub = MagicMock() + mock_response = secret_service_pb.ListSecretsResponse() + mock_stub.List.return_value = mock_response + mock_client.return_value = mock_stub + + result = LockboxSecretBackend()._list_secrets( + folder_id="test_folder", + ) + + mock_client.assert_called_once() + mock_stub.List.assert_called_once() + assert result == mock_response + + def test_yandex_lockbox_secret_backend_folder_id(self): + folder_id = "id1" + + res = LockboxSecretBackend( + folder_id=folder_id, + ).folder_id + + assert res == folder_id + + @patch("airflow.models.connection.Connection.get_connection_from_secrets") + def test_yandex_lockbox_secret_backend_folder_id_from_connection(self, mock_get_connection): + folder_id = "id1" + + mock_get_connection.return_value = Mock( + connection_id=default_conn_name, + extra_dejson={"folder_id": folder_id}, + ) + + sm = LockboxSecretBackend() + _ = sm._client + res = sm.folder_id + + assert res == folder_id + + def test_yandex_lockbox_secret_backend__get_field_connection_not_specified(self): + sm = LockboxSecretBackend() + sm.yc_connection_id = None + res = sm._get_field("somefield") + + assert res is None From 2622b59e89d1d2d05778dd84d6efec0577d5906b Mon Sep 17 00:00:00 2001 From: Vadim Vladimirov Date: Wed, 10 Jan 2024 22:46:05 +0700 Subject: [PATCH 4/4] docs: Yandex LockboxSecretBackend --- .../apache-airflow-providers-yandex/index.rst | 1 + .../yandex-cloud-lockbox-secret-backend.rst | 293 ++++++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst diff --git a/docs/apache-airflow-providers-yandex/index.rst b/docs/apache-airflow-providers-yandex/index.rst index 7a1736ac518d7..cfe0f68b30945 100644 --- a/docs/apache-airflow-providers-yandex/index.rst +++ b/docs/apache-airflow-providers-yandex/index.rst @@ -36,6 +36,7 @@ Configuration Connection types + Lockbox Secret Backend Operators .. toctree:: diff --git a/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst new file mode 100644 index 0000000000000..9403dad0ea12f --- /dev/null +++ b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst @@ -0,0 +1,293 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Yandex.Cloud Lockbox Secret Backend +=================================== + +This topic describes how to configure Apache Airflow to use `Yandex Lockbox `__ +as a secret backend and how to manage secrets. + +Before you begin +---------------- + +Before you start, make sure you have installed the ``yandex`` provider in your Apache Airflow installation: + +.. code-block:: bash + + pip install apache-airflow-providers-yandex + +Enabling the Yandex Lockbox secret backend +------------------------------------------ + +To enable Yandex Lockbox as secrets backend, +specify :py:class:`~airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend` +as the ``backend`` in ``[secrets]`` section of ``airflow.cfg`` file. + +Here is a sample configuration: + +.. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + +You can also set this with an environment variable: + +.. code-block:: bash + + export AIRFLOW__SECRETS__BACKEND=airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + +You can verify the correct setting of the configuration options by using the ``airflow config get-value`` command: + +.. code-block:: console + + $ airflow config get-value secrets backend + airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + +Backend parameters +------------------ + +The next step is to configure backend parameters using the ``backend_kwargs`` options. +You can pass the following parameters: + +* ``yc_oauth_token``: Specifies the user account OAuth token to connect to Yandex Lockbox with. Looks like ``y3_xxxxx``. +* ``yc_sa_key_json``: Specifies the service account auth JSON. Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. +* ``yc_sa_key_json_path``: Specifies the service account auth JSON file path. Looks like ``/home/airflow/authorized_key.json``. File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. +* ``yc_connection_id``: Specifies the connection ID to connect to Yandex Lockbox with. Default: "yandexcloud_default" +* ``folder_id``: Specifies the folder ID to search for Yandex Lockbox secrets in. If set to None (null in JSON), requests will use the connection folder_id if specified. +* ``connections_prefix``: Specifies the prefix of the secret to read to get Connections. If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. Default: "airflow/connections" +* ``variables_prefix``: Specifies the prefix of the secret to read to get Variables. If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. Default: "airflow/variables" +* ``config_prefix``: Specifies the prefix of the secret to read to get Configurations. If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. Default: "airflow/config" +* ``sep``: Specifies the separator used to concatenate secret_prefix and secret_id. Default: "/" +* ``endpoint``: Specifies an API endpoint. Leave blank to use default. + +All options should be passed as a JSON dictionary. + +For example, if you want to set parameter ``connections_prefix`` to ``"example-connections-prefix"`` +and parameter ``variables_prefix`` to ``"example-variables-prefix"``, +your configuration file should look like this: + +.. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend + backend_kwargs = {"connections_prefix": "example-connections-prefix", "variables_prefix": "example-variables-prefix"} + +Set-up credentials +------------------ + +You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. +Credentials will be used with this priority: + +* OAuth Token +* Service Account JSON file +* Service Account JSON +* Yandex Cloud Connection + +If no credentials specified, default connection id ``yandexcloud_default`` will be used. + +Using OAuth token for authorization as users account +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +First, you need to create `OAuth token `__ for user account. +It will looks like ``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``. + +Then you need to specify the ``folder_id`` and token in the ``backend_kwargs``: + +.. code-block:: ini + + [secrets] + backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_oauth_token": "y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"} + +Using Authorized keys for authorization as service account +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Before you start, make sure you have `created `__ +a Yandex Cloud `Service Account `__ +with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``. + +First, you need to create `Authorized key `__ +for your service account and save the generated JSON file with public and private key parts. + +Then you need to specify the ``folder_id`` and key in the ``backend_kwargs``: + +.. code-block:: ini + + [secrets] + backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_sa_key_json": {"id": "...", "service_account_id": "...", "private_key": "..."}"} + +Alternatively, you can specify the path to JSON file in the ``backend_kwargs``: + +.. code-block:: ini + + [secrets] + backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_sa_key_json_path": "/home/airflow/authorized_key.json"} + +Using Yandex Cloud Connection for authorization +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +First, you need to create :ref:`Yandex Cloud Connection `. + +Then you need to specify the ``connection_id`` in the ``backend_kwargs``: + +.. code-block:: ini + + [secrets] + backend_kwargs = {"yc_connection_id": "my_yc_connection"} + +If no credentials specified, Lockbox Secret Backend will try to use default connection id ``yandexcloud_default``. + +Lockbox Secret Backend will try to use default folder id from Connection, +also you can specify the ``folder_id`` in the ``backend_kwargs``: + +.. code-block:: ini + + [secrets] + backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_connection_id": "my_yc_connection"} + +Storing and Retrieving Connections +---------------------------------- + +To store a Connection, you need to `create secret `__ +with name in format ``{connections_prefix}{sep}{connection_name}`` +and payload contains text value with any key. + +Storing a Connection as a URI +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The main way is to save connections as a :ref:`connection URI representation `. + +Example: ``mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A`` + +Here is an example of secret creation with the ``yc`` cli: + +.. code-block:: console + + $ yc lockbox secret create \ + --name airflow/connections/mysqldb \ + --payload '[{"key": "value", "text_value": "mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A"}]' + done (1s) + name: airflow/connections/mysqldb + +Storing a Connection as a JSON +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Alternatively, you can save connections in JSON format: + +.. code-block:: json + + { + "conn_type": "mysql", + "host": "myhost.com", + "login": "myname", + "password": "mypassword", + "extra": { + "this_param": "some val", + "that_param": "other val*" + } + } + +Here is an example of secret creation with the ``yc`` cli: + +.. code-block:: console + + $ yc lockbox secret create \ + --name airflow/connections/mysqldbjson \ + --payload '[{"key": "value", "text_value": "{\"conn_type\": \"mysql\", \"host\": \"myhost.com\", \"login\": \"myname\", \"password\": \"mypassword\", \"extra\": {\"this_param\": \"some val\", \"that_param\": \"other val*\"}}"}]' + done (1s) + name: airflow/connections/mysqldbjson + +Retrieving Connection +~~~~~~~~~~~~~~~~~~~~~ + +To check the connection is correctly read from the Lockbox Secret Backend, you can use ``airflow connections get``: + +.. code-block:: console + + $ airflow connections get mysqldb -o json + [{"id": null, "conn_id": "mysqldb", "conn_type": "mysql", "description": null, "host": "myhost.com", "schema": "", "login": "myname", "password": "mypassword", "port": null, "is_encrypted": "False", "is_extra_encrypted": "False", "extra_dejson": {"this_param": "some val", "that_param": "other val*"}, "get_uri": "mysql://myname:mypassword@myhost.com/?this_param=some+val&that_param=other+val%2A"}] + +Storing and Retrieving Variables +-------------------------------- + +To store a Variable, you need to `create secret `__ +with name in format ``{variables_prefix}{sep}{variable_name}`` +and payload contains text value with any key. + +This is an example variable value: ``some_secret_data`` + +Here is an example of secret creation with the ``yc`` cli: + +.. code-block:: console + + $ yc lockbox secret create \ + --name airflow/variables/my_variable \ + --payload '[{"key": "value", "text_value": "some_secret_data"}]' + done (1s) + name: airflow/variables/my_variable + +To check the variable is correctly read from the Lockbox Secret Backend, you can use ``airflow variables get``: + +.. code-block:: console + + $ airflow variables get my_variable + some_secret_data + +Storing and Retrieving Configs +------------------------------ + +You can store some sensitive configs in the Lockbox Secret Backend. + +For example, we will provide a secret for ``sentry.sentry_dsn`` and use ``sentry_dsn_value`` as the config value name. + +To store a Config, you need to `create secret `__ +with name in format ``{config_prefix}{sep}{config_value_name}`` +and payload contains text value with any key. + +Here is an example of secret creation with the ``yc`` cli: + +.. code-block:: console + + $ yc lockbox secret create \ + --name airflow/config/sentry_dsn_value \ + --payload '[{"key": "value", "text_value": "https://public@sentry.example.com/1"}]' + done (1s) + name: airflow/config/sentry_dsn_value + +Then, we need to specify the config value name as ``{key}_secret`` in the Apache Airflow configuration: + +.. code-block:: ini + + [sentry] + sentry_dsn_secret = sentry_dsn_value + +To check the config value is correctly read from the Lockbox Secret Backend, you can use ``airflow config get-value``: + +.. code-block:: console + + $ airflow config get-value sentry sentry_dsn + https://public@sentry.example.com/1 + +Clean up +-------- + +You can easily delete your secret with the ``yc`` cli: + +.. code-block:: console + + $ yc lockbox secret delete --name airflow/connections/mysqldb + name: airflow/connections/mysqldb