diff --git a/airflow/providers/slack/hooks/slack.py b/airflow/providers/slack/hooks/slack.py index c00f0106e1e4d..cb12132eafd72 100644 --- a/airflow/providers/slack/hooks/slack.py +++ b/airflow/providers/slack/hooks/slack.py @@ -15,27 +15,48 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Hook for Slack""" -from typing import Any, Optional + +import json +import warnings +from typing import TYPE_CHECKING, Any, Dict, List, Optional from slack_sdk import WebClient -from slack_sdk.web.slack_response import SlackResponse +from slack_sdk.errors import SlackApiError -from airflow.exceptions import AirflowException +from airflow.compat.functools import cached_property +from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.hooks.base import BaseHook +from airflow.providers.slack.utils import ConnectionExtraConfig, prefixed_extra_field +from airflow.utils.log.secrets_masker import mask_secret + +if TYPE_CHECKING: + from slack_sdk.http_retry import RetryHandler + from slack_sdk.web.slack_response import SlackResponse class SlackHook(BaseHook): """ - Creates a Slack connection to be used for calls. + Creates a Slack API Connection to be used for calls. + + This class provide a thin wrapper around the ``slack_sdk.WebClient``. + + .. seealso:: + - :ref:`Slack API connection ` + - https://api.slack.com/messaging + - https://slack.dev/python-slack-sdk/web/index.html + + .. warning:: + This hook intend to use `Slack API` connection + and might not work correctly with `Slack Webhook` and `HTTP` connections. Takes both Slack API token directly and connection that has Slack API token. If both are supplied, Slack API token will be used. Also exposes the rest of slack.WebClient args. + Examples: - .. code-block:: python + .. code-block:: python # Create hook - slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack") + slack_hook = SlackHook(slack_conn_id="slack_api_default") # Call generic API with parameters (errors are handled by hook) # For more details check https://api.slack.com/methods/chat.postMessage @@ -45,28 +66,124 @@ class SlackHook(BaseHook): # For more details check https://slack.dev/python-slack-sdk/web/index.html#messaging slack_hook.client.chat_postMessage(channel="#random", text="Hello world!") - :param token: Slack API token :param slack_conn_id: :ref:`Slack connection id ` that has Slack API token in the password field. - :param use_session: A boolean specifying if the client should take advantage of - connection pooling. Default is True. - :param base_url: A string representing the Slack API base URL. Default is - ``https://www.slack.com/api/`` - :param timeout: The maximum number of seconds the client will wait - to connect and receive a response from Slack. Default is 30 seconds. + :param timeout: The maximum number of seconds the client will wait to connect + and receive a response from Slack. If not set than default WebClient value will use. + :param base_url: A string representing the Slack API base URL. + If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``). + :param proxy: Proxy to make the Slack Incoming Webhook call. + :param retry_handlers: List of handlers to customize retry logic in WebClient. + :param token: (deprecated) Slack API Token. """ + conn_name_attr = 'slack_conn_id' + default_conn_name = 'slack_api_default' + conn_type = 'slack' + hook_name = 'Slack API' + def __init__( self, token: Optional[str] = None, slack_conn_id: Optional[str] = None, - **client_args: Any, + base_url: Optional[str] = None, + timeout: Optional[int] = None, + proxy: Optional[str] = None, + retry_handlers: Optional[List["RetryHandler"]] = None, + **extra_client_args: Any, ) -> None: + if not token and not slack_conn_id: + raise AirflowException("Either `slack_conn_id` or `token` should be provided.") + if token: + mask_secret(token) + warnings.warn( + "Provide token as hook argument deprecated by security reason and will be removed " + "in a future releases. Please specify token in `Slack API` connection.", + DeprecationWarning, + stacklevel=2, + ) + if not slack_conn_id: + warnings.warn( + "You have not set parameter `slack_conn_id`. Currently `Slack API` connection id optional " + "but in a future release it will mandatory.", + FutureWarning, + stacklevel=2, + ) + super().__init__() - self.token = self.__get_token(token, slack_conn_id) - self.client = WebClient(self.token, **client_args) + self._token = token + self.slack_conn_id = slack_conn_id + self.base_url = base_url + self.timeout = timeout + self.proxy = proxy + self.retry_handlers = retry_handlers + self.extra_client_args = extra_client_args + if self.extra_client_args.pop("use_session", None) is not None: + warnings.warn("`use_session` has no affect in slack_sdk.WebClient.", UserWarning, stacklevel=2) + + @cached_property + def client(self) -> WebClient: + """Get the underlying slack_sdk.WebClient (cached).""" + return WebClient(**self._get_conn_params()) + + def get_conn(self) -> WebClient: + """Get the underlying slack_sdk.WebClient (cached).""" + return self.client + + def _get_conn_params(self) -> Dict[str, Any]: + """Fetch connection params as a dict and merge it with hook parameters.""" + conn = self.get_connection(self.slack_conn_id) if self.slack_conn_id else None + conn_params: Dict[str, Any] = {} + + if self._token: + conn_params["token"] = self._token + elif conn: + if not conn.password: + raise AirflowNotFoundException( + f"Connection ID {self.slack_conn_id!r} does not contain password (Slack API Token)." + ) + conn_params["token"] = conn.password + + extra_config = ConnectionExtraConfig( + conn_type=self.conn_type, + conn_id=conn.conn_id if conn else None, + extra=conn.extra_dejson if conn else {}, + ) + + # Merge Hook parameters with Connection config + conn_params.update( + { + "timeout": self.timeout or extra_config.getint("timeout", default=None), + "base_url": self.base_url or extra_config.get("base_url", default=None), + "proxy": self.proxy or extra_config.get("proxy", default=None), + "retry_handlers": ( + self.retry_handlers or extra_config.getimports("retry_handlers", default=None) + ), + } + ) + + # Add additional client args + conn_params.update(self.extra_client_args) + if "logger" not in conn_params: + conn_params["logger"] = self.log + + return {k: v for k, v in conn_params.items() if v is not None} + + @cached_property + def token(self) -> str: + warnings.warn( + "`SlackHook.token` property deprecated and will be removed in a future releases.", + DeprecationWarning, + stacklevel=2, + ) + return self._get_conn_params()["token"] def __get_token(self, token: Any, slack_conn_id: Any) -> str: + warnings.warn( + "`SlackHook.__get_token` method deprecated and will be removed in a future releases.", + DeprecationWarning, + stacklevel=2, + ) if token is not None: return token @@ -79,7 +196,7 @@ def __get_token(self, token: Any, slack_conn_id: Any) -> str: raise AirflowException('Cannot get token: No valid Slack token nor slack_conn_id supplied.') - def call(self, api_method: str, **kwargs) -> SlackResponse: + def call(self, api_method: str, **kwargs) -> "SlackResponse": """ Calls Slack WebClient `WebClient.api_call` with given arguments. @@ -95,3 +212,78 @@ def call(self, api_method: str, **kwargs) -> SlackResponse: iterated on to execute subsequent requests. """ return self.client.api_call(api_method, **kwargs) + + def test_connection(self): + """Tests the Slack API connection. + + .. seealso:: + https://api.slack.com/methods/auth.test + """ + try: + response = self.call("auth.test") + response.validate() + except SlackApiError as e: + return False, str(e) + except Exception as e: + return False, f"Unknown error occurred while testing connection: {e}" + + if isinstance(response.data, bytes): + # If response data binary then return simple message + return True, f"Connection successfully tested (url: {response.api_url})." + + try: + return True, json.dumps(response.data) + except TypeError: + return True, str(response) + + @classmethod + def get_connection_form_widgets(cls) -> Dict[str, Any]: + """Returns dictionary of widgets to be added for the hook to handle extra values.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import IntegerField, StringField + + return { + prefixed_extra_field("timeout", cls.conn_type): IntegerField( + lazy_gettext("Timeout"), + widget=BS3TextFieldWidget(), + description="Optional. The maximum number of seconds the client will wait to connect " + "and receive a response from Slack API.", + ), + prefixed_extra_field("base_url", cls.conn_type): StringField( + lazy_gettext('Base URL'), + widget=BS3TextFieldWidget(), + description="Optional. A string representing the Slack API base URL.", + ), + prefixed_extra_field("proxy", cls.conn_type): StringField( + lazy_gettext('Proxy'), + widget=BS3TextFieldWidget(), + description="Optional. Proxy to make the Slack API call.", + ), + prefixed_extra_field("retry_handlers", cls.conn_type): StringField( + lazy_gettext('Retry Handlers'), + widget=BS3TextFieldWidget(), + description="Optional. Comma separated list of import paths to zero-argument callable " + "which returns retry handler for Slack WebClient.", + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> Dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["login", "port", "host", "schema", "extra"], + "relabeling": { + "password": "Slack API Token", + }, + "placeholders": { + "password": "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx", + prefixed_extra_field("timeout", cls.conn_type): "30", + prefixed_extra_field("base_url", cls.conn_type): "https://www.slack.com/api/", + prefixed_extra_field("proxy", cls.conn_type): "http://localhost:9000", + prefixed_extra_field("retry_handlers", cls.conn_type): ( + "slack_sdk.http_retry.builtin_handlers.ConnectionErrorRetryHandler," + "slack_sdk.http_retry.builtin_handlers.RateLimitErrorRetryHandler" + ), + }, + } diff --git a/airflow/providers/slack/hooks/slack_webhook.py b/airflow/providers/slack/hooks/slack_webhook.py index 27d45c0741090..b05c375a2e61d 100644 --- a/airflow/providers/slack/hooks/slack_webhook.py +++ b/airflow/providers/slack/hooks/slack_webhook.py @@ -31,6 +31,10 @@ class SlackWebhookHook(HttpHook): If both supplied, http_conn_id will be used as base_url, and webhook_token will be taken as endpoint, the relative path of the url. + .. warning:: + This hook intend to use `Slack Webhook` connection + and might not work correctly with `Slack API` connection. + Each Slack webhook token can be pre-configured to use a specific channel, username and icon. You can override these defaults in this hook. diff --git a/airflow/providers/slack/operators/slack.py b/airflow/providers/slack/operators/slack.py index 1aa5edc22be4a..0c4ec53df2b85 100644 --- a/airflow/providers/slack/operators/slack.py +++ b/airflow/providers/slack/operators/slack.py @@ -29,7 +29,7 @@ class SlackAPIOperator(BaseOperator): In the future additional Slack API Operators will be derived from this class as well. Only one of `slack_conn_id` and `token` is required. - :param slack_conn_id: :ref:`Slack connection id ` + :param slack_conn_id: :ref:`Slack API Connection ` which its password is Slack API token. Optional :param token: Slack API token (https://api.slack.com/web). Optional :param method: The Slack API Method to Call (https://api.slack.com/methods). Optional diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml index 57843af639867..25804659a0967 100644 --- a/airflow/providers/slack/provider.yaml +++ b/airflow/providers/slack/provider.yaml @@ -68,5 +68,7 @@ transfers: how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst connection-types: + - hook-class-name: airflow.providers.slack.hooks.slack.SlackHook + connection-type: slack - hook-class-name: airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook connection-type: slackwebhook diff --git a/airflow/providers/slack/utils/__init__.py b/airflow/providers/slack/utils/__init__.py new file mode 100644 index 0000000000000..483a540092eb6 --- /dev/null +++ b/airflow/providers/slack/utils/__init__.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, Optional + +from airflow.utils.module_loading import import_string + +try: + from airflow.utils.types import NOTSET +except ImportError: # TODO: Remove when the provider has an Airflow 2.3+ requirement. + + class ArgNotSet: + """Sentinel type for annotations, useful when None is not viable.""" + + NOTSET = ArgNotSet() # type: ignore[assignment] + + +def prefixed_extra_field(field: str, conn_type: str) -> str: + """Get prefixed extra field name.""" + return f"extra__{conn_type}__{field}" + + +class ConnectionExtraConfig: + """Helper class for rom Connection Extra. + + :param conn_type: Hook connection type. + :param conn_id: Connection ID uses for appropriate error messages. + :param extra: Connection extra dictionary. + """ + + def __init__(self, conn_type: str, conn_id: Optional[str] = None, extra: Optional[Dict[str, Any]] = None): + super().__init__() + self.conn_type = conn_type + self.conn_id = conn_id + self.extra = extra or {} + + def get(self, field, default: Any = NOTSET): + """Get specified field from Connection Extra. + + :param field: Connection extra field name. + :param default: If specified then use as default value if field not present in Connection Extra. + """ + prefixed_field = prefixed_extra_field(field, self.conn_type) + if prefixed_field in self.extra: + return self.extra[prefixed_field] + elif field in self.extra: + return self.extra[field] + else: + if default is NOTSET: + raise KeyError( + f"Couldn't find {prefixed_field!r} or {field!r} " + f"in Connection ({self.conn_id!r}) Extra and no default value specified." + ) + return default + + def getint(self, field, default: Any = NOTSET) -> Any: + """Get specified field from Connection Extra and evaluate as integer. + + :param field: Connection extra field name. + :param default: If specified then use as default value if field not present in Connection Extra. + """ + value = self.get(field=field, default=default) + if value != default: + value = int(value) + return value + + def getimports(self, field, default: Any = NOTSET) -> Any: + """Get specified field from Connection Extra and imports the full qualified name separated by comma. + + .. note:: + This method intends to use with zero-argument callable objects. + + :param field: Connection extra field name. + :param default: If specified then use as default value if field not present in Connection Extra. + """ + value = self.get(field=field, default=default) + if value != default: + if not isinstance(value, str): + raise TypeError( + f"Connection ({self.conn_id!r}) Extra {field!r} expected string " + f"when return value not equal `default={default}`, got {type(value).__name__}." + ) + value = [import_string(part.strip())() for part in value.split(",")] + return value diff --git a/docs/apache-airflow-providers-slack/connections/index.rst b/docs/apache-airflow-providers-slack/connections/index.rst new file mode 100644 index 0000000000000..4a917420f6301 --- /dev/null +++ b/docs/apache-airflow-providers-slack/connections/index.rst @@ -0,0 +1,28 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Slack Connections +================= + + +.. toctree:: + :maxdepth: 1 + :glob: + + * diff --git a/docs/apache-airflow-providers-slack/connections/slack.rst b/docs/apache-airflow-providers-slack/connections/slack.rst index 0cf58904ff25f..ffcb128660230 100644 --- a/docs/apache-airflow-providers-slack/connections/slack.rst +++ b/docs/apache-airflow-providers-slack/connections/slack.rst @@ -19,10 +19,10 @@ .. _howto/connection:slack: -Slack Connection -================ +Slack API Connection +==================== -The Slack connection type enables Slack Integrations. +The Slack connection type enables Slack API Integrations. Authenticating to Slack ----------------------- @@ -33,21 +33,54 @@ Authenticate to Slack using a `Slack API token Default Connection IDs ---------------------- -The SlackHook and SlackAPIOperator use ``slack_default`` by default. +.. warning:: + + The SlackHook and community provided operators not intend to use any Slack API Connection by default right now. + It might change in the future to ``slack_api_default``. Configuring the Connection -------------------------- -Password (optional) +Password Specify the Slack API token. -When specifying the connection in environment variable you should specify -it using URI syntax. +Extra (optional) + Specify the extra parameters (as json dictionary) that can be used in slack_sdk.WebClient. + All parameters are optional. + + * ``timeout``: The maximum number of seconds the client will wait to connect and receive a response from Slack API. + * ``base_url``: A string representing the Slack API base URL. + * ``proxy``: Proxy to make the Slack Incoming Webhook call. + * ``retry_handlers``: Comma separated list of import paths to zero-argument callable which returns retry handler + for Slack WebClient. + +If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded. + +Examples +-------- + +**Set Slack API Connection as Environment Variable (URI)** + .. code-block:: bash + + export AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx@/?timeout=42' + +**Snippet for create Connection as URI**: + .. code-block:: python -Note that all components of the URI should be URL-encoded. + from airflow.models.connection import Connection -For example: + conn = Connection( + conn_id="slack_api_default", + conn_type="slack", + password="xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx", + extra={ + # Specify extra parameters here + "timeout": "42", + }, + ) -.. code-block:: bash + # Generate Environment Variable Name + env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" - export AIRFLOW_CONN_SLACK_DEFAULT='slack://:token@' + print(f"{env_key}='{conn.get_uri()}'") + # AIRFLOW_CONN_SLACK_API_DEFAULT='slack://:xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx@/?timeout=42' diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst index d4b094709c542..e67929b9179e4 100644 --- a/docs/apache-airflow-providers-slack/index.rst +++ b/docs/apache-airflow-providers-slack/index.rst @@ -42,9 +42,9 @@ Content :maxdepth: 1 :caption: References - Connection Types + Connection Types Python API <_api/airflow/providers/slack/index> - Example DAGs + Example DAGs .. toctree:: :maxdepth: 1 diff --git a/tests/providers/slack/hooks/test_slack.py b/tests/providers/slack/hooks/test_slack.py index e25f80b9dc784..9981ed331adf0 100644 --- a/tests/providers/slack/hooks/test_slack.py +++ b/tests/providers/slack/hooks/test_slack.py @@ -16,95 +16,267 @@ # specific language governing permissions and limitations # under the License. -import unittest +from typing import Any, Dict from unittest import mock import pytest from slack_sdk.errors import SlackApiError +from slack_sdk.http_retry.builtin_handlers import ConnectionErrorRetryHandler, RateLimitErrorRetryHandler +from slack_sdk.web.slack_response import SlackResponse -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowNotFoundException +from airflow.models.connection import Connection from airflow.providers.slack.hooks.slack import SlackHook +MOCK_SLACK_API_TOKEN = "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx" +SLACK_API_DEFAULT_CONN_ID = SlackHook.default_conn_name +SLACK_CONN_TYPE = "slack" +TEST_CONN_ERROR_RETRY_HANDLER = ConnectionErrorRetryHandler(max_retry_count=42) +TEST_RATE_LIMIT_RETRY_HANDLER = RateLimitErrorRetryHandler() + +CONN_TYPE = "slack-incoming-webhook" +VALID_CONN_IDS = [ + SlackHook.default_conn_name, + "conn_full_url_connection", + "conn_host_with_schema", + "conn_parts", +] + + +def conn_error_retry_handler(): + return TEST_CONN_ERROR_RETRY_HANDLER + + +def rate_limit_retry_handler(): + return TEST_RATE_LIMIT_RETRY_HANDLER + + +@pytest.fixture(scope="module", autouse=True) +def slack_api_connections(): + """Create tests connections.""" + connections = [ + Connection( + conn_id=SLACK_API_DEFAULT_CONN_ID, + conn_type=CONN_TYPE, + password=MOCK_SLACK_API_TOKEN, + ), + Connection( + conn_id="compat_http_type", + conn_type="http", + password=MOCK_SLACK_API_TOKEN, + ), + Connection( + conn_id="empty_slack_connection", + conn_type=CONN_TYPE, + ), + ] + + conn_uris = {f"AIRFLOW_CONN_{c.conn_id.upper()}": c.get_uri() for c in connections} + + with mock.patch.dict("os.environ", values=conn_uris): + yield + + +class TestSlackHook: + def test_token_arg_deprecated(self): + """Test deprecation warning if token provided as hook argument.""" + warning_message = ( + "Provide token as hook argument deprecated by security reason and will be removed " + r"in a future releases. Please specify token in `Slack API` connection\." + ) + with pytest.warns(DeprecationWarning, match=warning_message): + SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID, token="foo-bar") + + def test_token_property_deprecated(self): + """Test deprecation warning if access to ``SlackHook.token`` property.""" + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) + warning_message = r"`SlackHook.token` property deprecated and will be removed in a future releases\." + with pytest.warns(DeprecationWarning, match=warning_message): + assert hook.token == MOCK_SLACK_API_TOKEN + + def test_optional_conn_id_deprecated(self): + """Test deprecation warning if not set connection ID.""" + warning_message = ( + r"You have not set parameter `slack_conn_id`\. Currently `Slack API` connection id optional " + r"but in a future release it will mandatory\." + ) + with pytest.warns(FutureWarning, match=warning_message): + SlackHook(token="foo-bar") + + def test_use_session_has_no_affect(self): + """Test that specified previously in docstring `use_session` take no affect.""" + warning_message = r"`use_session` has no affect in slack_sdk\.WebClient\." + with pytest.warns(UserWarning, match=warning_message): + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID, use_session="foo-bar") + assert "use_session" not in hook.extra_client_args -class TestSlackHook(unittest.TestCase): def test_get_token_with_token_only(self): - """tests `__get_token` method when only token is provided""" - # Given - test_token = 'test_token' - test_conn_id = None + """Test retrieve token when only hook arg provided without Slack API Connection ID.""" + test_hook_arg_token = 'xapp-1-arg-token' + assert SlackHook(test_hook_arg_token, None)._get_conn_params()["token"] == test_hook_arg_token - # Run - hook = SlackHook(test_token, test_conn_id) + @pytest.mark.parametrize( + "conn_id", + [ + SLACK_API_DEFAULT_CONN_ID, + "compat_http_type", # In case if users use Slack Webhook Connection ID from UI for Slack API + ], + ) + def test_get_token_from_connection(self, conn_id): + """Test retrieve token from Slack API Connection ID.""" + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) + assert hook._get_conn_params()["token"] == MOCK_SLACK_API_TOKEN - # Assert - output = hook.token - expected = test_token - assert output == expected + def test_resolve_token(self): + """Test retrieve token when both hook arg and Slack API Connection ID provided.""" + test_hook_arg_token = 'xapp-1-arg-token' + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID, token=test_hook_arg_token) + assert hook._get_conn_params()["token"] == test_hook_arg_token + def test_nor_token_and_nor_conn_id_provided(self): + """Test neither hook arg and Slack API Connection ID provided.""" + with pytest.raises(AirflowException, match=r"Either `slack_conn_id` or `token` should be provided\."): + SlackHook(slack_conn_id=None, token=None) + + def test_empty_password(self): + """Test password field defined in the connection.""" + hook = SlackHook(slack_conn_id="empty_slack_connection") + error_message = r"Connection ID '.*' does not contain password \(Slack API Token\)\." + with pytest.raises(AirflowNotFoundException, match=error_message): + hook._get_conn_params() + + @pytest.mark.parametrize( + "hook_config,conn_extra,expected", + [ + ( # Test Case: hook config + { + "timeout": 42, + "base_url": "http://hook-base-url:1234", + "proxy": "https://hook-proxy:1234", + "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER], + }, + {}, + { + "timeout": 42, + "base_url": "http://hook-base-url:1234", + "proxy": "https://hook-proxy:1234", + "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER], + }, + ), + ( # Test Case: connection config + {}, + { + "timeout": "9000", + "base_url": "http://conn-base-url:4321", + "proxy": "https://conn-proxy:4321", + "retry_handlers": ( + "tests.providers.slack.hooks.test_slack.rate_limit_retry_handler," + "tests.providers.slack.hooks.test_slack.conn_error_retry_handler" + ), + }, + { + "timeout": 9000, + "base_url": "http://conn-base-url:4321", + "proxy": "https://conn-proxy:4321", + "retry_handlers": [TEST_RATE_LIMIT_RETRY_HANDLER, TEST_CONN_ERROR_RETRY_HANDLER], + }, + ), + ( # Test Case: Connection from the UI + {}, + { + "extra__slack__timeout": 9000, + "extra__slack__base_url": "http://conn-base-url:4321", + "extra__slack__proxy": "https://conn-proxy:4321", + "extra__slack__retry_handlers": ( + "tests.providers.slack.hooks.test_slack.rate_limit_retry_handler," + "tests.providers.slack.hooks.test_slack.conn_error_retry_handler" + ), + }, + { + "timeout": 9000, + "base_url": "http://conn-base-url:4321", + "proxy": "https://conn-proxy:4321", + "retry_handlers": [TEST_RATE_LIMIT_RETRY_HANDLER, TEST_CONN_ERROR_RETRY_HANDLER], + }, + ), + ( # Test Case: Merge configs - hook args overwrite conn config + { + "timeout": 1, + "proxy": "https://hook-proxy:777", + "retry_handlers": [TEST_RATE_LIMIT_RETRY_HANDLER], + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4321", + "retry_handlers": ( + "tests.providers.slack.hooks.test_slack.rate_limit_retry_handler," + "tests.providers.slack.hooks.test_slack.conn_error_retry_handler" + ), + }, + { + "timeout": 1, + "proxy": "https://hook-proxy:777", + "retry_handlers": [TEST_RATE_LIMIT_RETRY_HANDLER], + }, + ), + ( # Test Case: Merge configs - resolve config + { + "timeout": 1, + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4334", + "retry_handlers": ("tests.providers.slack.hooks.test_slack.conn_error_retry_handler"), + }, + { + "timeout": 1, + "proxy": "https://conn-proxy:4334", + "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER], + }, + ), + ( # Test Case: empty configs + {}, + {}, + {}, + ), + ( # Test Case: extra_client_args + {"foo": "bar"}, + {}, + {"foo": "bar"}, + ), + ( # Test Case: ignored not expected connection extra + {}, + {"spam": "egg"}, + {}, + ), + ], + ) @mock.patch('airflow.providers.slack.hooks.slack.WebClient') - @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') - def test_get_token_with_valid_slack_conn_id_only(self, get_connection_mock, mock_slack_client): - """tests `__get_token` method when only connection is provided""" - # Given - test_token = None - test_conn_id = 'x' - test_password = 'test_password' - - # Mock - get_connection_mock.return_value = mock.Mock(password=test_password) - - # Run - hook = SlackHook(test_token, test_conn_id) - - # Assert - output = hook.token - expected = test_password - assert output == expected - mock_slack_client.assert_called_once_with(test_password) - - @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') - def test_get_token_with_no_password_slack_conn_id_only(self, get_connection_mock): - """tests `__get_token` method when only connection is provided""" - - # Mock - conn = mock.Mock() - del conn.password - get_connection_mock.return_value = conn - - # Assert - with pytest.raises(AirflowException): - SlackHook(token=None, slack_conn_id='x') - - @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') - def test_get_token_with_empty_password_slack_conn_id_only(self, get_connection_mock): - """tests `__get_token` method when only connection is provided""" - - # Mock - get_connection_mock.return_value = mock.Mock(password=None) - - # Assert - with pytest.raises(AirflowException): - SlackHook(token=None, slack_conn_id='x') - - def test_get_token_with_token_and_slack_conn_id(self): - """tests `__get_token` method when both arguments are provided""" - # Given - test_token = 'test_token' - test_conn_id = 'x' - - # Run - hook = SlackHook(test_token, test_conn_id) - - # Assert - output = hook.token - expected = test_token - assert output == expected - - def test_get_token_with_out_token_nor_slack_conn_id(self): - """tests `__get_token` method when no arguments are provided""" - - with pytest.raises(AirflowException): - SlackHook(token=None, slack_conn_id=None) + def test_client_configuration( + self, mock_webclient_cls, hook_config, conn_extra, expected: Dict[str, Any] + ): + """Test read/parse/merge WebClient config from connection and hook arguments.""" + expected["token"] = MOCK_SLACK_API_TOKEN + test_conn = Connection( + conn_id="test-slack-conn", + conn_type=CONN_TYPE, + password=MOCK_SLACK_API_TOKEN, + extra=conn_extra, + ) + test_conn_env = f"AIRFLOW_CONN_{test_conn.conn_id.upper()}" + mock_webclient = mock_webclient_cls.return_value + + with mock.patch.dict("os.environ", values={test_conn_env: test_conn.get_uri()}): + hook = SlackHook(slack_conn_id=test_conn.conn_id, **hook_config) + expected["logger"] = hook.log + conn_params = hook._get_conn_params() + assert conn_params == expected + + client = hook.client + assert client == mock_webclient + assert hook.get_conn() == mock_webclient + assert hook.get_conn() is client # cached + mock_webclient_cls.assert_called_once_with(**expected) @mock.patch('airflow.providers.slack.hooks.slack.WebClient') def test_call_with_failure(self, slack_client_class_mock): @@ -113,9 +285,7 @@ def test_call_with_failure(self, slack_client_class_mock): expected_exception = SlackApiError(message='foo', response='bar') slack_client_mock.api_call = mock.Mock(side_effect=expected_exception) - test_token = 'test_token' - test_slack_conn_id = 'test_slack_conn_id' - slack_hook = SlackHook(token=test_token, slack_conn_id=test_slack_conn_id) + slack_hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) test_method = 'test_method' test_api_params = {'key1': 'value1', 'key2': 'value2'} @@ -128,8 +298,71 @@ def test_api_call(self, slack_client_class_mock): slack_client_class_mock.return_value = slack_client_mock slack_client_mock.api_call.return_value = {'ok': True} - slack_hook = SlackHook(token='test_token') + slack_hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) test_api_json = {'channel': 'test_channel'} slack_hook.call("chat.postMessage", json=test_api_json) slack_client_mock.api_call.assert_called_with("chat.postMessage", json=test_api_json) + + @pytest.mark.parametrize( + "response_data", + [ + { + "ok": True, + "url": "https://subarachnoid.slack.com/", + "team": "Subarachnoid Workspace", + "user": "grace", + "team_id": "T12345678", + "user_id": "W12345678", + }, + { + "ok": True, + "url": "https://subarachnoid.slack.com/", + "team": "Subarachnoid Workspace", + "user": "bot", + "team_id": "T0G9PQBBK", + "user_id": "W23456789", + "bot_id": "BZYBOTHED", + }, + b"some-binary-data", + ], + ) + @mock.patch('airflow.providers.slack.hooks.slack.WebClient') + def test_hook_connection_success(self, mock_webclient_cls, response_data): + """Test SlackHook success connection.""" + mock_webclient = mock_webclient_cls.return_value + mock_webclient_call = mock_webclient.api_call + mock_webclient_call.return_value = SlackResponse( + status_code=200, + data=response_data, + # Mock other mandatory SlackResponse arguments + **{ma: mock.MagicMock for ma in ("client", "http_verb", "api_url", "req_args", "headers")}, + ) + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) + conn_test = hook.test_connection() + mock_webclient_call.assert_called_once_with("auth.test") + assert conn_test[0] + + @pytest.mark.parametrize( + "response_data", + [ + {"ok": False, "error": "invalid_auth"}, + {"ok": False, "error": "not_authed"}, + b"some-binary-data", + ], + ) + @mock.patch('airflow.providers.slack.hooks.slack.WebClient') + def test_hook_connection_failed(self, mock_webclient_cls, response_data): + """Test SlackHook failure connection.""" + mock_webclient = mock_webclient_cls.return_value + mock_webclient_call = mock_webclient.api_call + mock_webclient_call.return_value = SlackResponse( + status_code=401, + data=response_data, + # Mock other mandatory SlackResponse arguments + **{ma: mock.MagicMock for ma in ("client", "http_verb", "api_url", "req_args", "headers")}, + ) + hook = SlackHook(slack_conn_id=SLACK_API_DEFAULT_CONN_ID) + conn_test = hook.test_connection() + mock_webclient_call.assert_called_once_with("auth.test") + assert not conn_test[0] diff --git a/tests/providers/slack/utils/__init__.py b/tests/providers/slack/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/slack/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/slack/utils/test_utils.py b/tests/providers/slack/utils/test_utils.py new file mode 100644 index 0000000000000..87baf9547dca6 --- /dev/null +++ b/tests/providers/slack/utils/test_utils.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from airflow.providers.slack.utils import ConnectionExtraConfig + + +class TestConnectionExtra: + @pytest.mark.parametrize("conn_type", ["slack", "slack_incoming_webhook"]) + def test_get_extra_field(self, conn_type): + """Test get arguments from connection extra: prefixed and not.""" + extra_config = ConnectionExtraConfig( + conn_type=conn_type, + conn_id="test-conn-id", + extra={"arg1": "foo", f"extra__{conn_type}__arg2": "bar"}, + ) + assert extra_config.get("arg1") == "foo" + assert extra_config.get("arg2") == "bar" + + def test_missing_extra_field(self): + """Test missing field in extra.""" + extra_config = ConnectionExtraConfig(conn_type="slack", conn_id="test-conn-id", extra={}) + error_message = ( + r"Couldn't find 'extra__slack__arg_missing' or 'arg_missing' " + r"in Connection \('test-conn-id'\) Extra and no default value specified\." + ) + with pytest.raises(KeyError, match=error_message): + extra_config.get("arg_missing") + + @pytest.mark.parametrize("value", [0, False, "", None], ids=lambda x: f"bool_false_{type(x).__name__}") + def test_default_extra_field(self, value): + """Test default value for missing field in extra.""" + extra_config = ConnectionExtraConfig(conn_type="slack", extra={}) + assert extra_config.get("arg_missing", default=value) == value + + @pytest.mark.parametrize("conn_type", ["slack", "slack_incoming_webhook"]) + def test_both_prefixed_and_not_in_extra_field(self, conn_type): + """Test resolve field from extra when both specified prefixed and not for single field.""" + extra_config = ConnectionExtraConfig( + conn_type=conn_type, + conn_id="test-conn-id", + extra={"arg1": "foo", f"extra__{conn_type}__arg1": "bar"}, + ) + assert extra_config.get("arg1") == "bar" + + def test_get_parse_int(self): + extra_config = ConnectionExtraConfig( + conn_type="slack", + extra={ + "int_arg_1": "42", + "int_arg_2": 9000, + }, + ) + assert extra_config.getint("int_arg_1") == 42 + assert extra_config.getint("int_arg_2") == 9000 + + def test_get_parse_imports(self): + extra_config = ConnectionExtraConfig( + conn_type="slack", + extra={ + "imports_arg_1": "builtins.str", + "imports_arg_2": "builtins.str,builtins.dict", + "imports_arg_3": " builtins.str , builtins.dict ", + }, + ) + assert extra_config.getimports("imports_arg_1") == [''] + assert extra_config.getimports("imports_arg_2") == ['', {}] + assert extra_config.getimports("imports_arg_3") == ['', {}]