-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Refactor Slack API Hook and add new Connection type #25852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,27 +15,48 @@ | |
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| """Hook for Slack""" | ||
| from typing import Any, Optional | ||
|
|
||
| import json | ||
| import warnings | ||
| from typing import TYPE_CHECKING, Any, Dict, List, Optional | ||
|
|
||
| from slack_sdk import WebClient | ||
| from slack_sdk.web.slack_response import SlackResponse | ||
| from slack_sdk.errors import SlackApiError | ||
|
|
||
| from airflow.exceptions import AirflowException | ||
| from airflow.compat.functools import cached_property | ||
| from airflow.exceptions import AirflowException, AirflowNotFoundException | ||
| from airflow.hooks.base import BaseHook | ||
| from airflow.providers.slack.utils import ConnectionExtraConfig, prefixed_extra_field | ||
| from airflow.utils.log.secrets_masker import mask_secret | ||
|
|
||
| if TYPE_CHECKING: | ||
| from slack_sdk.http_retry import RetryHandler | ||
| from slack_sdk.web.slack_response import SlackResponse | ||
|
|
||
|
|
||
| class SlackHook(BaseHook): | ||
| """ | ||
| Creates a Slack connection to be used for calls. | ||
| Creates a Slack API Connection to be used for calls. | ||
|
|
||
| This class provide a thin wrapper around the ``slack_sdk.WebClient``. | ||
|
|
||
| .. seealso:: | ||
| - :ref:`Slack API connection <howto/connection:slack>` | ||
| - https://api.slack.com/messaging | ||
| - https://slack.dev/python-slack-sdk/web/index.html | ||
|
|
||
| .. warning:: | ||
| This hook intend to use `Slack API` connection | ||
| and might not work correctly with `Slack Webhook` and `HTTP` connections. | ||
|
|
||
| Takes both Slack API token directly and connection that has Slack API token. If both are | ||
| supplied, Slack API token will be used. Also exposes the rest of slack.WebClient args. | ||
|
|
||
| Examples: | ||
| .. code-block:: python | ||
| .. code-block:: python | ||
|
|
||
| # Create hook | ||
| slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack") | ||
| slack_hook = SlackHook(slack_conn_id="slack_api_default") | ||
|
|
||
| # Call generic API with parameters (errors are handled by hook) | ||
| # For more details check https://api.slack.com/methods/chat.postMessage | ||
|
|
@@ -45,28 +66,124 @@ class SlackHook(BaseHook): | |
| # For more details check https://slack.dev/python-slack-sdk/web/index.html#messaging | ||
| slack_hook.client.chat_postMessage(channel="#random", text="Hello world!") | ||
|
|
||
| :param token: Slack API token | ||
| :param slack_conn_id: :ref:`Slack connection id <howto/connection:slack>` | ||
| that has Slack API token in the password field. | ||
| :param use_session: A boolean specifying if the client should take advantage of | ||
| connection pooling. Default is True. | ||
| :param base_url: A string representing the Slack API base URL. Default is | ||
| ``https://www.slack.com/api/`` | ||
| :param timeout: The maximum number of seconds the client will wait | ||
| to connect and receive a response from Slack. Default is 30 seconds. | ||
| :param timeout: The maximum number of seconds the client will wait to connect | ||
| and receive a response from Slack. If not set than default WebClient value will use. | ||
| :param base_url: A string representing the Slack API base URL. | ||
| If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``). | ||
| :param proxy: Proxy to make the Slack Incoming Webhook call. | ||
| :param retry_handlers: List of handlers to customize retry logic in WebClient. | ||
| :param token: (deprecated) Slack API Token. | ||
| """ | ||
|
|
||
| conn_name_attr = 'slack_conn_id' | ||
| default_conn_name = 'slack_api_default' | ||
| conn_type = 'slack' | ||
| hook_name = 'Slack API' | ||
|
|
||
| def __init__( | ||
| self, | ||
| token: Optional[str] = None, | ||
| slack_conn_id: Optional[str] = None, | ||
| **client_args: Any, | ||
| base_url: Optional[str] = None, | ||
| timeout: Optional[int] = None, | ||
| proxy: Optional[str] = None, | ||
| retry_handlers: Optional[List["RetryHandler"]] = None, | ||
| **extra_client_args: Any, | ||
| ) -> None: | ||
| if not token and not slack_conn_id: | ||
| raise AirflowException("Either `slack_conn_id` or `token` should be provided.") | ||
| if token: | ||
| mask_secret(token) | ||
| warnings.warn( | ||
| "Provide token as hook argument deprecated by security reason and will be removed " | ||
| "in a future releases. Please specify token in `Slack API` connection.", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| if not slack_conn_id: | ||
| warnings.warn( | ||
| "You have not set parameter `slack_conn_id`. Currently `Slack API` connection id optional " | ||
| "but in a future release it will mandatory.", | ||
| FutureWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| super().__init__() | ||
| self.token = self.__get_token(token, slack_conn_id) | ||
| self.client = WebClient(self.token, **client_args) | ||
| self._token = token | ||
| self.slack_conn_id = slack_conn_id | ||
| self.base_url = base_url | ||
| self.timeout = timeout | ||
| self.proxy = proxy | ||
| self.retry_handlers = retry_handlers | ||
| self.extra_client_args = extra_client_args | ||
| if self.extra_client_args.pop("use_session", None) is not None: | ||
| warnings.warn("`use_session` has no affect in slack_sdk.WebClient.", UserWarning, stacklevel=2) | ||
|
|
||
| @cached_property | ||
| def client(self) -> WebClient: | ||
| """Get the underlying slack_sdk.WebClient (cached).""" | ||
| return WebClient(**self._get_conn_params()) | ||
|
|
||
| def get_conn(self) -> WebClient: | ||
| """Get the underlying slack_sdk.WebClient (cached).""" | ||
| return self.client | ||
|
|
||
| def _get_conn_params(self) -> Dict[str, Any]: | ||
| """Fetch connection params as a dict and merge it with hook parameters.""" | ||
| conn = self.get_connection(self.slack_conn_id) if self.slack_conn_id else None | ||
| conn_params: Dict[str, Any] = {} | ||
|
|
||
| if self._token: | ||
| conn_params["token"] = self._token | ||
| elif conn: | ||
| if not conn.password: | ||
| raise AirflowNotFoundException( | ||
| f"Connection ID {self.slack_conn_id!r} does not contain password (Slack API Token)." | ||
| ) | ||
| conn_params["token"] = conn.password | ||
|
|
||
| extra_config = ConnectionExtraConfig( | ||
| conn_type=self.conn_type, | ||
| conn_id=conn.conn_id if conn else None, | ||
| extra=conn.extra_dejson if conn else {}, | ||
| ) | ||
|
|
||
| # Merge Hook parameters with Connection config | ||
| conn_params.update( | ||
| { | ||
| "timeout": self.timeout or extra_config.getint("timeout", default=None), | ||
| "base_url": self.base_url or extra_config.get("base_url", default=None), | ||
| "proxy": self.proxy or extra_config.get("proxy", default=None), | ||
| "retry_handlers": ( | ||
| self.retry_handlers or extra_config.getimports("retry_handlers", default=None) | ||
| ), | ||
| } | ||
| ) | ||
|
|
||
| # Add additional client args | ||
| conn_params.update(self.extra_client_args) | ||
| if "logger" not in conn_params: | ||
| conn_params["logger"] = self.log | ||
|
|
||
| return {k: v for k, v in conn_params.items() if v is not None} | ||
|
|
||
| @cached_property | ||
| def token(self) -> str: | ||
| warnings.warn( | ||
| "`SlackHook.token` property deprecated and will be removed in a future releases.", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return self._get_conn_params()["token"] | ||
|
|
||
| def __get_token(self, token: Any, slack_conn_id: Any) -> str: | ||
| warnings.warn( | ||
| "`SlackHook.__get_token` method deprecated and will be removed in a future releases.", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
||
| if token is not None: | ||
| return token | ||
|
|
||
|
|
@@ -79,7 +196,7 @@ def __get_token(self, token: Any, slack_conn_id: Any) -> str: | |
|
|
||
| raise AirflowException('Cannot get token: No valid Slack token nor slack_conn_id supplied.') | ||
|
|
||
| def call(self, api_method: str, **kwargs) -> SlackResponse: | ||
| def call(self, api_method: str, **kwargs) -> "SlackResponse": | ||
| """ | ||
| Calls Slack WebClient `WebClient.api_call` with given arguments. | ||
|
|
||
|
|
@@ -95,3 +212,78 @@ def call(self, api_method: str, **kwargs) -> SlackResponse: | |
| iterated on to execute subsequent requests. | ||
| """ | ||
| return self.client.api_call(api_method, **kwargs) | ||
|
|
||
| def test_connection(self): | ||
| """Tests the Slack API connection. | ||
|
|
||
| .. seealso:: | ||
| https://api.slack.com/methods/auth.test | ||
| """ | ||
| try: | ||
| response = self.call("auth.test") | ||
| response.validate() | ||
| except SlackApiError as e: | ||
| return False, str(e) | ||
| except Exception as e: | ||
| return False, f"Unknown error occurred while testing connection: {e}" | ||
|
|
||
| if isinstance(response.data, bytes): | ||
| # If response data binary then return simple message | ||
| return True, f"Connection successfully tested (url: {response.api_url})." | ||
|
|
||
| try: | ||
| return True, json.dumps(response.data) | ||
| except TypeError: | ||
| return True, str(response) | ||
|
|
||
| @classmethod | ||
| def get_connection_form_widgets(cls) -> Dict[str, Any]: | ||
| """Returns dictionary of widgets to be added for the hook to handle extra values.""" | ||
| from flask_appbuilder.fieldwidgets import BS3TextFieldWidget | ||
| from flask_babel import lazy_gettext | ||
| from wtforms import IntegerField, StringField | ||
|
|
||
| return { | ||
| prefixed_extra_field("timeout", cls.conn_type): IntegerField( | ||
| lazy_gettext("Timeout"), | ||
| widget=BS3TextFieldWidget(), | ||
| description="Optional. The maximum number of seconds the client will wait to connect " | ||
| "and receive a response from Slack API.", | ||
| ), | ||
| prefixed_extra_field("base_url", cls.conn_type): StringField( | ||
| lazy_gettext('Base URL'), | ||
| widget=BS3TextFieldWidget(), | ||
| description="Optional. A string representing the Slack API base URL.", | ||
| ), | ||
| prefixed_extra_field("proxy", cls.conn_type): StringField( | ||
| lazy_gettext('Proxy'), | ||
| widget=BS3TextFieldWidget(), | ||
| description="Optional. Proxy to make the Slack API call.", | ||
| ), | ||
| prefixed_extra_field("retry_handlers", cls.conn_type): StringField( | ||
| lazy_gettext('Retry Handlers'), | ||
| widget=BS3TextFieldWidget(), | ||
| description="Optional. Comma separated list of import paths to zero-argument callable " | ||
| "which returns retry handler for Slack WebClient.", | ||
| ), | ||
| } | ||
|
|
||
| @classmethod | ||
| def get_ui_field_behaviour(cls) -> Dict[str, Any]: | ||
| """Returns custom field behaviour.""" | ||
| return { | ||
| "hidden_fields": ["login", "port", "host", "schema", "extra"], | ||
| "relabeling": { | ||
| "password": "Slack API Token", | ||
| }, | ||
| "placeholders": { | ||
| "password": "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx", | ||
| prefixed_extra_field("timeout", cls.conn_type): "30", | ||
| prefixed_extra_field("base_url", cls.conn_type): "https://www.slack.com/api/", | ||
| prefixed_extra_field("proxy", cls.conn_type): "http://localhost:9000", | ||
| prefixed_extra_field("retry_handlers", cls.conn_type): ( | ||
| "slack_sdk.http_retry.builtin_handlers.ConnectionErrorRetryHandler," | ||
| "slack_sdk.http_retry.builtin_handlers.RateLimitErrorRetryHandler" | ||
| ), | ||
| }, | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This additional indent required to proper formatted block in documentation and PyCharm
Without additional space
https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/hooks/slack/index.html#airflow.providers.slack.hooks.slack.SlackHook
With additional space

Might be possible avoid this "hack"?