From 70537ac7a9115f2db6be495fb922a6a4819a0f72 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 11:33:26 +0100 Subject: [PATCH 01/12] refactor: Pop non-header related parameters from the Connection extra_options which are used by the SimpleHttpOperator to avoid a InvalidHeader exception while instantiating the requests Session --- airflow/providers/http/hooks/http.py | 9 +++- tests/providers/http/hooks/test_http.py | 72 ++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 1a8e5f7aebb43..cb203c0b44477 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -112,8 +112,15 @@ def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: elif self._auth_type: session.auth = self.auth_type() if conn.extra: + extra_options = conn.extra_dejson + extra_options.pop("timeout", None) + extra_options.pop("allow_redirects", None) + extra_options.pop("proxies", None) + extra_options.pop("verify", None) + extra_options.pop("cert", None) + try: - session.headers.update(conn.extra_dejson) + session.headers.update(extra_options) except TypeError: self.log.warning("Connection to %s has invalid extra field.", conn.host) if headers: diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index 345afa54dea8c..0de5faba5f3b6 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -46,17 +46,23 @@ def aioresponse(): yield async_response -def get_airflow_connection(unused_conn_id=None): - return Connection(conn_id="http_default", conn_type="http", host="test:8080/", extra='{"bearer": "test"}') +def get_airflow_connection(conn_id: str = "http_default"): + return Connection(conn_id=conn_id, conn_type="http", host="test:8080/", extra='{"bearer": "test"}') -def get_airflow_connection_with_port(unused_conn_id=None): - return Connection(conn_id="http_default", conn_type="http", host="test.com", port=1234) +def get_airflow_connection_with_extra(extra: dict): + def inner(conn_id: str = "http_default"): + return Connection(conn_id=conn_id, conn_type="http", host="test:8080/", extra=json.dumps(extra)) + return inner -def get_airflow_connection_with_login_and_password(unused_conn_id=None): +def get_airflow_connection_with_port(conn_id: str = "http_default"): + return Connection(conn_id=conn_id, conn_type="http", host="test.com", port=1234) + + +def get_airflow_connection_with_login_and_password(conn_id: str = "http_default"): return Connection( - conn_id="http_default", conn_type="http", host="test.com", login="username", password="pass" + conn_id=conn_id, conn_type="http", host="test.com", login="username", password="pass" ) @@ -119,6 +125,60 @@ def test_hook_contains_header_from_extra_field(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers assert conn.headers.get("bearer") == "test" + def test_hook_ignore_timeout_from_extra_field_as_header(self): + airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "timeout": 60}) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", + side_effect=airflow_connection): + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("timeout") is None + + def test_hook_ignore_allow_redirects_from_extra_field_as_header(self): + airflow_connection = get_airflow_connection_with_extra( + extra={"bearer": "test", "allow_redirects": False}) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", + side_effect=airflow_connection): + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("allow_redirects") is None + + def test_hook_ignore_proxies_from_extra_field_as_header(self): + airflow_connection = get_airflow_connection_with_extra( + extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}}) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", + side_effect=airflow_connection): + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("proxies") is None + + def test_hook_ignore_verify_from_extra_field_as_header(self): + airflow_connection = get_airflow_connection_with_extra( + extra={"bearer": "test", "verify": False}) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", + side_effect=airflow_connection): + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("verify") is None + + def test_hook_ignore_cert_from_extra_field_as_header(self): + airflow_connection = get_airflow_connection_with_extra( + extra={"bearer": "test", "cert": "cert.crt"}) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", + side_effect=airflow_connection): + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("cert") is None + @mock.patch("requests.Request") def test_hook_with_method_in_lowercase(self, mock_requests): from requests.exceptions import InvalidURL, MissingSchema From bee9dc087a0e637341bd36849897a255ca55d658 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 12:13:20 +0100 Subject: [PATCH 02/12] refactor: Forgot to assign non-header related parameters to pop to the instantiated request Session as default value --- airflow/providers/http/hooks/http.py | 11 ++++---- tests/providers/http/hooks/test_http.py | 35 ++++++++++++++++--------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index cb203c0b44477..f833f1edb29b6 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -26,6 +26,7 @@ from aiohttp import ClientResponseError from asgiref.sync import sync_to_async from requests.auth import HTTPBasicAuth +from requests.models import DEFAULT_REDIRECT_LIMIT from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter from airflow.exceptions import AirflowException @@ -113,11 +114,11 @@ def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: session.auth = self.auth_type() if conn.extra: extra_options = conn.extra_dejson - extra_options.pop("timeout", None) - extra_options.pop("allow_redirects", None) - extra_options.pop("proxies", None) - extra_options.pop("verify", None) - extra_options.pop("cert", None) + session.proxies = extra_options.pop("proxies", {}) + session.stream = extra_options.pop("stream", False) + session.verify = extra_options.pop("verify", True) + session.cert = extra_options.pop("cert", None) + session.max_redirects = extra_options.pop("max_redirects", DEFAULT_REDIRECT_LIMIT) try: session.headers.update(extra_options) diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index 0de5faba5f3b6..926811ab7ef8b 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -31,6 +31,7 @@ from aioresponses import aioresponses from requests.adapters import Response from requests.auth import AuthBase, HTTPBasicAuth +from requests.models import DEFAULT_REDIRECT_LIMIT from airflow.exceptions import AirflowException from airflow.models import Connection @@ -125,19 +126,9 @@ def test_hook_contains_header_from_extra_field(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers assert conn.headers.get("bearer") == "test" - def test_hook_ignore_timeout_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "timeout": 60}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", - side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("timeout") is None - - def test_hook_ignore_allow_redirects_from_extra_field_as_header(self): + def test_hook_ignore_max_redirects_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "allow_redirects": False}) + extra={"bearer": "test", "max_redirects": 3}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() @@ -145,6 +136,11 @@ def test_hook_ignore_allow_redirects_from_extra_field_as_header(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers assert conn.headers.get("bearer") == "test" assert conn.headers.get("allow_redirects") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == 3 def test_hook_ignore_proxies_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( @@ -156,6 +152,11 @@ def test_hook_ignore_proxies_from_extra_field_as_header(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers assert conn.headers.get("bearer") == "test" assert conn.headers.get("proxies") is None + assert conn.proxies == {"http":"http://proxy:80", "https":"https://proxy:80"} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT def test_hook_ignore_verify_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( @@ -167,6 +168,11 @@ def test_hook_ignore_verify_from_extra_field_as_header(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers assert conn.headers.get("bearer") == "test" assert conn.headers.get("verify") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is False + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT def test_hook_ignore_cert_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( @@ -178,6 +184,11 @@ def test_hook_ignore_cert_from_extra_field_as_header(self): assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers assert conn.headers.get("bearer") == "test" assert conn.headers.get("cert") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert == "cert.crt" + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT @mock.patch("requests.Request") def test_hook_with_method_in_lowercase(self, mock_requests): From a71470bad36669c202e72f6cb1936bc33e31673d Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 13:51:34 +0100 Subject: [PATCH 03/12] refactor: Also use the extra options from connections when using an AsyncHttpHook --- airflow/providers/http/hooks/http.py | 40 +++++++++--- tests/providers/http/hooks/test_http.py | 86 +++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 8 deletions(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 9d3957e3e8960..b7d2f395fa3bb 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -31,6 +31,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook +from airflow.models import Connection if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse @@ -114,15 +115,16 @@ def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: elif self._auth_type: session.auth = self.auth_type() if conn.extra: - extra_options = conn.extra_dejson - session.proxies = extra_options.pop("proxies", {}) - session.stream = extra_options.pop("stream", False) - session.verify = extra_options.pop("verify", True) - session.cert = extra_options.pop("cert", None) - session.max_redirects = extra_options.pop("max_redirects", DEFAULT_REDIRECT_LIMIT) + extra = conn.extra_dejson + extra.pop("allow_redirects", None) # ignore this as only max_redirects is accepted in Session + session.proxies = extra.pop("proxies", extra.pop("proxy", {})) + session.stream = extra.pop("stream", False) + session.verify = extra.pop("verify", extra.pop("verify_ssl", True)) + session.cert = extra.pop("cert", None) + session.max_redirects = extra.pop("max_redirects", DEFAULT_REDIRECT_LIMIT) try: - session.headers.update(extra_options) + session.headers.update(extra) except TypeError: self.log.warning("Connection to %s has invalid extra field.", conn.host) if headers: @@ -344,8 +346,10 @@ async def run( if conn.login: auth = self.auth_type(conn.login, conn.password) if conn.extra: + extra = self._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + try: - _headers.update(conn.extra_dejson) + _headers.update(extra) except TypeError: self.log.warning("Connection to %s has invalid extra field.", conn.host) if headers: @@ -403,6 +407,26 @@ async def run( else: raise NotImplementedError # should not reach this, but makes mypy happy + @classmethod + def _process_extra_options_from_connection(cls, conn: Connection, extra_options: dict) -> dict: + extra = conn.extra_dejson + extra.pop("stream", None) + extra.pop("cert", None) + proxies = extra.pop("proxies", extra.pop("proxy", None)) + verify_ssl = extra.pop("verify", extra.pop("verify_ssl", None)) + allow_redirects = extra.pop("allow_redirects", None) + max_redirects = extra.pop("max_redirects", None) + + if proxies is not None and "proxy" not in extra_options: + extra_options["proxy"] = proxies + if verify_ssl is not None and "verify_ssl" not in extra_options: + extra_options["verify_ssl"] = verify_ssl + if allow_redirects is not None and "allow_redirects" not in extra_options: + extra_options["allow_redirects"] = allow_redirects + if max_redirects is not None and "max_redirects" not in extra_options: + extra_options["max_redirects"] = max_redirects + return extra + def _retryable_error_async(self, exception: ClientResponseError) -> bool: """Determine whether an exception may successful on a subsequent attempt. diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index 76ce3a468f52d..ca524b9e19cec 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -596,3 +596,89 @@ async def test_async_request_uses_connection_extra(self, aioresponse): assert all( key in headers and headers[key] == value for key, value in connection_extra.items() ) + + def test_process_extra_options_from_connection_when_stream_is_defined_just_ignore_it(self): + extra_options = {} + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "stream": True})() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_cert_is_defined_just_ignore_it(self): + extra_options = {} + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"})() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_proxies_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_proxy_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_verify_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "verify": False} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"verify_ssl": False} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_verify_ssl_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "verify_ssl": False} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"verify_ssl": False} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_allow_redirects_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "allow_redirects": False} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"allow_redirects": False} + assert actual == {"bearer": "test"} + + def test_process_extra_options_from_connection_when_max_redirects_is_defined(self): + extra_options = {} + conn = get_airflow_connection_with_extra( + extra={"bearer": "test", "max_redirects": 3} + )() + + actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) + + assert extra_options == {"max_redirects": 3} + assert actual == {"bearer": "test"} + + From d69a5504634d5f99ff5742481567dcb283c8b803 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 14:01:06 +0100 Subject: [PATCH 04/12] docs: Updated the HTTP Connection documentation concerning the optional Extra field --- .../apache-airflow-providers-http/connections/http.rst | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-http/connections/http.rst b/docs/apache-airflow-providers-http/connections/http.rst index 41856cefee7d8..44f3703ebbbc9 100644 --- a/docs/apache-airflow-providers-http/connections/http.rst +++ b/docs/apache-airflow-providers-http/connections/http.rst @@ -54,7 +54,15 @@ Schema (optional) Specify the service type etc: http/https. Extras (optional) - Specify headers in json format. + Specify headers and default requests parameters in json format. + Following default requests parameters are taken into account: + * ``stream`` + * ``cert`` + * ``proxies or proxy`` + * ``verify or verify_ssl`` + * ``allow_redirects`` + * ``max_redirects`` + When specifying the connection in environment variable you should specify it using URI syntax. From b31a885c5a071197d50f162556416d1d5363c53d Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 14:44:56 +0100 Subject: [PATCH 05/12] refactor: Fixed static checks on test http module --- tests/providers/http/hooks/test_http.py | 40 +++++++++---------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index ca524b9e19cec..7562b9e13b787 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -62,9 +62,7 @@ def get_airflow_connection_with_port(conn_id: str = "http_default"): def get_airflow_connection_with_login_and_password(conn_id: str = "http_default"): - return Connection( - conn_id=conn_id, conn_type="http", host="test.com", login="username", password="pass" - ) + return Connection(conn_id=conn_id, conn_type="http", host="test.com", login="username", password="pass") class TestHttpHook: @@ -127,8 +125,7 @@ def test_hook_contains_header_from_extra_field(self): assert conn.headers.get("bearer") == "test" def test_hook_ignore_max_redirects_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "max_redirects": 3}) + airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() @@ -144,7 +141,8 @@ def test_hook_ignore_max_redirects_from_extra_field_as_header(self): def test_hook_ignore_proxies_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}}) + extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}} + ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() @@ -159,8 +157,7 @@ def test_hook_ignore_proxies_from_extra_field_as_header(self): assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT def test_hook_ignore_verify_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "verify": False}) + airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() @@ -175,8 +172,7 @@ def test_hook_ignore_verify_from_extra_field_as_header(self): assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT def test_hook_ignore_cert_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "cert": "cert.crt"}) + airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() @@ -618,30 +614,28 @@ def test_process_extra_options_from_connection_when_cert_is_defined_just_ignore_ def test_process_extra_options_from_connection_when_proxies_is_defined(self): extra_options = {} conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}} + extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}} )() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - assert extra_options == {"proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + assert extra_options == {"proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} assert actual == {"bearer": "test"} def test_process_extra_options_from_connection_when_proxy_is_defined(self): extra_options = {} conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + extra={"bearer": "test", "proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} )() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - assert extra_options == {"proxy": {"http":"http://proxy:80", "https":"https://proxy:80"}} + assert extra_options == {"proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} assert actual == {"bearer": "test"} def test_process_extra_options_from_connection_when_verify_is_defined(self): extra_options = {} - conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "verify": False} - )() + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False})() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) @@ -650,9 +644,7 @@ def test_process_extra_options_from_connection_when_verify_is_defined(self): def test_process_extra_options_from_connection_when_verify_ssl_is_defined(self): extra_options = {} - conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "verify_ssl": False} - )() + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "verify_ssl": False})() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) @@ -661,9 +653,7 @@ def test_process_extra_options_from_connection_when_verify_ssl_is_defined(self): def test_process_extra_options_from_connection_when_allow_redirects_is_defined(self): extra_options = {} - conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "allow_redirects": False} - )() + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "allow_redirects": False})() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) @@ -672,9 +662,7 @@ def test_process_extra_options_from_connection_when_allow_redirects_is_defined(s def test_process_extra_options_from_connection_when_max_redirects_is_defined(self): extra_options = {} - conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "max_redirects": 3} - )() + conn = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3})() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) From 3f74dd7d607af507a242ed666a2c46f4059ad3e7 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 15:14:40 +0100 Subject: [PATCH 06/12] refactor: Also allow the definition of timeout as a request parameter in extra_options and added async test for AsyncHttpOperator --- airflow/providers/http/hooks/http.py | 4 ++++ tests/providers/http/hooks/test_http.py | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index b7d2f395fa3bb..3e6252ed3ea63 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -116,6 +116,7 @@ def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: session.auth = self.auth_type() if conn.extra: extra = conn.extra_dejson + extra.pop("timeout", None) # ignore this as timeout is only accepted in request method of Session extra.pop("allow_redirects", None) # ignore this as only max_redirects is accepted in Session session.proxies = extra.pop("proxies", extra.pop("proxy", {})) session.stream = extra.pop("stream", False) @@ -413,12 +414,15 @@ def _process_extra_options_from_connection(cls, conn: Connection, extra_options: extra.pop("stream", None) extra.pop("cert", None) proxies = extra.pop("proxies", extra.pop("proxy", None)) + timeout = extra.pop("timeout", None) verify_ssl = extra.pop("verify", extra.pop("verify_ssl", None)) allow_redirects = extra.pop("allow_redirects", None) max_redirects = extra.pop("max_redirects", None) if proxies is not None and "proxy" not in extra_options: extra_options["proxy"] = proxies + if timeout is not None and "timeout" not in extra_options: + extra_options["timeout"] = timeout if verify_ssl is not None and "verify_ssl" not in extra_options: extra_options["verify_ssl"] = verify_ssl if allow_redirects is not None and "allow_redirects" not in extra_options: diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index 7562b9e13b787..6e4a7d04ef593 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -593,6 +593,29 @@ async def test_async_request_uses_connection_extra(self, aioresponse): key in headers and headers[key] == value for key, value in connection_extra.items() ) + @pytest.mark.asyncio + async def test_async_request_uses_connection_extra_with_requests_parameters(self): + """Test api call asynchronously with a connection that has extra field.""" + connection_extra = {"bearer": "test"} + proxy = {"http": "http://proxy:80", "https": "https://proxy:80"} + airflow_connection = get_airflow_connection_with_extra( + extra={**connection_extra, **{"proxies": proxy, "timeout": 60, "verify": False, "allow_redirects": False, "max_redirects": 3}} + ) + + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): + hook = HttpAsyncHook() + with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: + await hook.run("v1/test") + headers = mocked_function.call_args.kwargs.get("headers") + assert all( + key in headers and headers[key] == value for key, value in connection_extra.items() + ) + assert mocked_function.call_args.kwargs.get("proxy") == proxy + assert mocked_function.call_args.kwargs.get("timeout") == 60 + assert mocked_function.call_args.kwargs.get("verify_ssl") is False + assert mocked_function.call_args.kwargs.get("allow_redirects") is False + assert mocked_function.call_args.kwargs.get("max_redirects") == 3 + def test_process_extra_options_from_connection_when_stream_is_defined_just_ignore_it(self): extra_options = {} conn = get_airflow_connection_with_extra(extra={"bearer": "test", "stream": True})() From f49fadc699ba63a90fb20beaccdf507bdbe9a63c Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 17:11:27 +0100 Subject: [PATCH 07/12] refactor: Fixed some formatting to make static checks happy --- airflow/providers/http/hooks/http.py | 4 +++- tests/providers/http/hooks/test_http.py | 29 ++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 3e6252ed3ea63..97d7bae334e51 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -116,7 +116,9 @@ def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: session.auth = self.auth_type() if conn.extra: extra = conn.extra_dejson - extra.pop("timeout", None) # ignore this as timeout is only accepted in request method of Session + extra.pop( + "timeout", None + ) # ignore this as timeout is only accepted in request method of Session extra.pop("allow_redirects", None) # ignore this as only max_redirects is accepted in Session session.proxies = extra.pop("proxies", extra.pop("proxy", {})) session.stream = extra.pop("stream", False) diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index 6e4a7d04ef593..f1748fec5e711 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -126,8 +126,7 @@ def test_hook_contains_header_from_extra_field(self): def test_hook_ignore_max_redirects_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", - side_effect=airflow_connection): + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() conn = self.get_hook.get_conn() assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers @@ -141,16 +140,15 @@ def test_hook_ignore_max_redirects_from_extra_field_as_header(self): def test_hook_ignore_proxies_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxies": {"http":"http://proxy:80", "https":"https://proxy:80"}} + extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}} ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", - side_effect=airflow_connection): + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() conn = self.get_hook.get_conn() assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers assert conn.headers.get("bearer") == "test" assert conn.headers.get("proxies") is None - assert conn.proxies == {"http":"http://proxy:80", "https":"https://proxy:80"} + assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"} assert conn.stream is False assert conn.verify is True assert conn.cert is None @@ -158,8 +156,7 @@ def test_hook_ignore_proxies_from_extra_field_as_header(self): def test_hook_ignore_verify_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", - side_effect=airflow_connection): + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() conn = self.get_hook.get_conn() assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers @@ -173,8 +170,7 @@ def test_hook_ignore_verify_from_extra_field_as_header(self): def test_hook_ignore_cert_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", - side_effect=airflow_connection): + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): expected_conn = airflow_connection() conn = self.get_hook.get_conn() assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers @@ -599,7 +595,16 @@ async def test_async_request_uses_connection_extra_with_requests_parameters(self connection_extra = {"bearer": "test"} proxy = {"http": "http://proxy:80", "https": "https://proxy:80"} airflow_connection = get_airflow_connection_with_extra( - extra={**connection_extra, **{"proxies": proxy, "timeout": 60, "verify": False, "allow_redirects": False, "max_redirects": 3}} + extra={ + **connection_extra, + **{ + "proxies": proxy, + "timeout": 60, + "verify": False, + "allow_redirects": False, + "max_redirects": 3, + }, + } ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): @@ -691,5 +696,3 @@ def test_process_extra_options_from_connection_when_max_redirects_is_defined(sel assert extra_options == {"max_redirects": 3} assert actual == {"bearer": "test"} - - From b7d64214c74987110db8f002989a8fae180978b8 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 17:13:40 +0100 Subject: [PATCH 08/12] refactor: Removed indentation from Extras section --- .../connections/http.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/apache-airflow-providers-http/connections/http.rst b/docs/apache-airflow-providers-http/connections/http.rst index 44f3703ebbbc9..6f1decdec9517 100644 --- a/docs/apache-airflow-providers-http/connections/http.rst +++ b/docs/apache-airflow-providers-http/connections/http.rst @@ -56,12 +56,12 @@ Schema (optional) Extras (optional) Specify headers and default requests parameters in json format. Following default requests parameters are taken into account: - * ``stream`` - * ``cert`` - * ``proxies or proxy`` - * ``verify or verify_ssl`` - * ``allow_redirects`` - * ``max_redirects`` + * ``stream`` + * ``cert`` + * ``proxies or proxy`` + * ``verify or verify_ssl`` + * ``allow_redirects`` + * ``max_redirects`` When specifying the connection in environment variable you should specify From 885d03e4609fdfd492328b74cbaf4a955028555f Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 17:41:44 +0100 Subject: [PATCH 09/12] refactor: Refactored different tests for the process_extra_options_from_connection into one test as suggested by aritra24 --- tests/providers/http/hooks/test_http.py | 87 ++++++------------------- 1 file changed, 19 insertions(+), 68 deletions(-) diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index f1748fec5e711..ed8769b2d6f48 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -621,78 +621,29 @@ async def test_async_request_uses_connection_extra_with_requests_parameters(self assert mocked_function.call_args.kwargs.get("allow_redirects") is False assert mocked_function.call_args.kwargs.get("max_redirects") == 3 - def test_process_extra_options_from_connection_when_stream_is_defined_just_ignore_it(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "stream": True})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_cert_is_defined_just_ignore_it(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_proxies_is_defined(self): - extra_options = {} - conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}} - )() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {"proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_proxy_is_defined(self): + def test_process_extra_options_from_connection(self): extra_options = {} + proxy = {"http": "http://proxy:80", "https": "https://proxy:80"} conn = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} + extra={ + "bearer": "test", + "stream": True, + "cert": "cert.crt", + "proxies": proxy, + "timeout": 60, + "verify": False, + "allow_redirects": False, + "max_redirects": 3, + } )() actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - assert extra_options == {"proxy": {"http": "http://proxy:80", "https": "https://proxy:80"}} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_verify_is_defined(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {"verify_ssl": False} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_verify_ssl_is_defined(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "verify_ssl": False})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {"verify_ssl": False} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_allow_redirects_is_defined(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "allow_redirects": False})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {"allow_redirects": False} - assert actual == {"bearer": "test"} - - def test_process_extra_options_from_connection_when_max_redirects_is_defined(self): - extra_options = {} - conn = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3})() - - actual = HttpAsyncHook._process_extra_options_from_connection(conn=conn, extra_options=extra_options) - - assert extra_options == {"max_redirects": 3} + assert extra_options == { + "proxy": proxy, + "timeout": 60, + "verify_ssl": False, + "allow_redirects": False, + "max_redirects": 3, + } assert actual == {"bearer": "test"} From ec67880013841a4f87580665004ae259ad55d060 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 11 Jan 2024 17:52:51 +0100 Subject: [PATCH 10/12] refactor: Fixed formatting of get_airflow_connection_with_extra --- tests/providers/http/hooks/test_http.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/providers/http/hooks/test_http.py b/tests/providers/http/hooks/test_http.py index ed8769b2d6f48..7b093c66bbabc 100644 --- a/tests/providers/http/hooks/test_http.py +++ b/tests/providers/http/hooks/test_http.py @@ -54,6 +54,7 @@ def get_airflow_connection(conn_id: str = "http_default"): def get_airflow_connection_with_extra(extra: dict): def inner(conn_id: str = "http_default"): return Connection(conn_id=conn_id, conn_type="http", host="test:8080/", extra=json.dumps(extra)) + return inner From ab5c8fc883468b8a44f7552cfca3ab155509f59f Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 12 Jan 2024 14:36:02 +0100 Subject: [PATCH 11/12] refactor: Moved import of Connection under type check --- airflow/providers/http/hooks/http.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 97d7bae334e51..86786c15059f2 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -31,10 +31,11 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import Connection + if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse + from airflow.models import Connection class HttpHook(BaseHook): From 36b62a4e6b040a432f3bc574386b15c706d23628 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 13 Jan 2024 17:52:21 +0100 Subject: [PATCH 12/12] refactor: Reformatted http hook --- airflow/providers/http/hooks/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 86786c15059f2..7b98ec25dfdee 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -32,9 +32,9 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook - if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse + from airflow.models import Connection