From 3938e16c58b2060aea13c12377f771cee1d320f5 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Mon, 14 Oct 2024 08:04:28 +0200 Subject: [PATCH 01/12] Add retry on error 502 and 504 --- airflow/api_internal/internal_api_call.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 8838377877bec..e3b186847d70d 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -39,6 +39,12 @@ logger = logging.getLogger(__name__) +class AirflowHttpException(AirflowException): + """Raise when there is a problem during an http request.""" + + def __init__(self, message: str, status_code: int): + super().__init__(message) + self.status_code = status_code class InternalApiConfig: """Stores and caches configuration for Internal API.""" @@ -104,11 +110,18 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: for more information . """ from requests.exceptions import ConnectionError - + from http import HTTPStatus + + def is_retryable_exception(exception: Exception) -> bool: + retryable_status_codes = [HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT] + return (isinstance(exception, AirflowHttpException) + and exception.status_code in retryable_status_codes + or isinstance(exception, (ConnectionError, NewConnectionError))) + @tenacity.retry( stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_exponential(min=1), - retry=tenacity.retry_if_exception_type((NewConnectionError, ConnectionError)), + retry=tenacity.retry_if_exception(is_retryable_exception), before_sleep=tenacity.before_log(logger, logging.WARNING), ) def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: @@ -126,9 +139,10 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint() response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers) if response.status_code != 200: - raise AirflowException( + raise AirflowHttpException( f"Got {response.status_code}:{response.reason} when sending " - f"the internal api request: {response.text}" + f"the internal api request: {response.text}", + response.status_code ) return response.content From bb2f848c884c55076ac63a8a12380a0d8aa9647f Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Mon, 14 Oct 2024 09:27:02 +0200 Subject: [PATCH 02/12] fix mypy findings --- airflow/api_internal/internal_api_call.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index e3b186847d70d..5f46530bb4737 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -21,6 +21,7 @@ import json import logging from functools import wraps +from http import HTTPStatus from typing import Callable, TypeVar from urllib.parse import urlparse @@ -42,7 +43,7 @@ class AirflowHttpException(AirflowException): """Raise when there is a problem during an http request.""" - def __init__(self, message: str, status_code: int): + def __init__(self, message: str, status_code: int | HTTPStatus): super().__init__(message) self.status_code = status_code @@ -109,14 +110,16 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: See [AIP-44](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API) for more information . """ + from requests.exceptions import ConnectionError - from http import HTTPStatus - def is_retryable_exception(exception: Exception) -> bool: - retryable_status_codes = [HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT] - return (isinstance(exception, AirflowHttpException) - and exception.status_code in retryable_status_codes - or isinstance(exception, (ConnectionError, NewConnectionError))) + def is_retryable_exception(exception: BaseException) -> bool: + retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) + return ( + isinstance(exception, AirflowHttpException) + and exception.status_code in retryable_status_codes + or isinstance(exception, (ConnectionError, NewConnectionError)) + ) @tenacity.retry( stop=tenacity.stop_after_attempt(10), @@ -142,7 +145,7 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: raise AirflowHttpException( f"Got {response.status_code}:{response.reason} when sending " f"the internal api request: {response.text}", - response.status_code + response.status_code, ) return response.content From 3b6c4743abf75f9fe94362301acea1dbac7bf7b2 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Mon, 14 Oct 2024 14:46:00 +0200 Subject: [PATCH 03/12] Add pytest --- tests/api_internal/test_internal_api_call.py | 24 +++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 02ae2d9f55125..2a2a5869ec929 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -27,7 +27,7 @@ import requests from airflow.__main__ import configure_internal_api -from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call +from airflow.api_internal.internal_api_call import AirflowHttpException, InternalApiConfig, internal_api_call from airflow.configuration import conf from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator @@ -267,6 +267,28 @@ def test_remote_classmethod_call_with_params(self, mock_requests): assert call_kwargs["headers"]["Content-Type"] == "application/json" assert "Authorization" in call_kwargs["headers"] + @conf_vars( + { + ("core", "database_access_isolation"): "true", + ("core", "internal_api_url"): "http://localhost:8888", + ("database", "sql_alchemy_conn"): "none://", + } + ) + @mock.patch("airflow.api_internal.internal_api_call.requests") + @mock.patch("tenacity.wait_exponential") + def test_retry_on_bad_gateway(self, mock_requests, mock_wait): + configure_internal_api(Namespace(subcommand="dag-processor"), conf) + response = requests.Response() + response.status_code = 502 + response.reason = "Bad Gateway" + mock_wait = lambda *_, **__: None + response._content = b"Bad Gateway" + + mock_requests.post.return_value = response + with pytest.raises(AirflowHttpException): + result = TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session") + assert mock_requests.post.call_count == 10 + @conf_vars( { ("core", "database_access_isolation"): "true", From e3b4fdda9d336161dbefa1bdb09d32565b38aea1 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 07:37:13 +0200 Subject: [PATCH 04/12] Convert response code to HTTPStatus --- airflow/api_internal/internal_api_call.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 5f46530bb4737..935b683507320 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -43,7 +43,7 @@ class AirflowHttpException(AirflowException): """Raise when there is a problem during an http request.""" - def __init__(self, message: str, status_code: int | HTTPStatus): + def __init__(self, message: str, status_code: HTTPStatus): super().__init__(message) self.status_code = status_code @@ -116,7 +116,7 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: def is_retryable_exception(exception: BaseException) -> bool: retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) return ( - isinstance(exception, AirflowHttpException) + isinstance(exception, AirflowHttpException) and exception.status_code in retryable_status_codes or isinstance(exception, (ConnectionError, NewConnectionError)) ) @@ -145,7 +145,7 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: raise AirflowHttpException( f"Got {response.status_code}:{response.reason} when sending " f"the internal api request: {response.text}", - response.status_code, + HTTPStatus(response.status_code), ) return response.content From ed076d55b7910858cd14dc71ac20f161aec2f2f3 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 07:38:38 +0200 Subject: [PATCH 05/12] Add docs to retriable exception --- airflow/api_internal/internal_api_call.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 935b683507320..0616b35b93f08 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -113,18 +113,21 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: from requests.exceptions import ConnectionError - def is_retryable_exception(exception: BaseException) -> bool: - retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) - return ( - isinstance(exception, AirflowHttpException) - and exception.status_code in retryable_status_codes - or isinstance(exception, (ConnectionError, NewConnectionError)) - ) + def _is_retryable_exception(exception: BaseException) -> bool: + """ + Evaluates which exception types should be retried. + + This is especially demanded for cases where an application gateway or Kubernetes ingress can + not find a running instance of a webserver hostring the API (HTTP 502+504) or when the + HTTP request fails in general on network level. + + Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. + """ @tenacity.retry( stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_exponential(min=1), - retry=tenacity.retry_if_exception(is_retryable_exception), + retry=tenacity.retry_if_exception(_is_retryable_exception), before_sleep=tenacity.before_log(logger, logging.WARNING), ) def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: From 589b37bb1c9c7dd7bbc290418315b902e86a8147 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 07:39:22 +0200 Subject: [PATCH 06/12] extend docs for AirflowHttpException --- airflow/api_internal/internal_api_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 0616b35b93f08..69bc12fa80614 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) class AirflowHttpException(AirflowException): - """Raise when there is a problem during an http request.""" + """Raise when there is a problem during an http request on the internal API decorator.""" def __init__(self, message: str, status_code: HTTPStatus): super().__init__(message) From 29fdb5a1b62843f4aee7e5f0c6294bfa91f3e2dc Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 07:53:42 +0200 Subject: [PATCH 07/12] Fix syntax and typos --- airflow/api_internal/internal_api_call.py | 25 +++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 69bc12fa80614..b40e4653cf862 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -114,15 +114,22 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: from requests.exceptions import ConnectionError def _is_retryable_exception(exception: BaseException) -> bool: - """ - Evaluates which exception types should be retried. - - This is especially demanded for cases where an application gateway or Kubernetes ingress can - not find a running instance of a webserver hostring the API (HTTP 502+504) or when the - HTTP request fails in general on network level. - - Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. - """ + """ + Evaluates which exception types should be retried. + + This is especially demanded for cases where an application gateway or Kubernetes ingress can + not find a running instance of a webserver hosting the API (HTTP 502+504) or when the + HTTP request fails in general on network level. + + Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. + """ + + retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) + return ( + isinstance(exception, AirflowHttpException) + and exception.status_code in retryable_status_codes + or isinstance(exception, (ConnectionError, NewConnectionError)) + ) @tenacity.retry( stop=tenacity.stop_after_attempt(10), From e27603500bb164c1786003418bb9b3c90bb94a86 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 11:02:34 +0200 Subject: [PATCH 08/12] fix pytest --- tests/api_internal/test_internal_api_call.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 2a2a5869ec929..27e22b8c97e3d 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -21,6 +21,7 @@ import json from argparse import Namespace from typing import TYPE_CHECKING +from tenacity import RetryError from unittest import mock import pytest @@ -275,19 +276,19 @@ def test_remote_classmethod_call_with_params(self, mock_requests): } ) @mock.patch("airflow.api_internal.internal_api_call.requests") - @mock.patch("tenacity.wait_exponential") - def test_retry_on_bad_gateway(self, mock_requests, mock_wait): + @mock.patch("tenacity.time.sleep") + def test_retry_on_bad_gateway(self, mock_sleep, mock_requests): configure_internal_api(Namespace(subcommand="dag-processor"), conf) response = requests.Response() response.status_code = 502 response.reason = "Bad Gateway" - mock_wait = lambda *_, **__: None response._content = b"Bad Gateway" - + + mock_sleep = lambda *_, **__: None mock_requests.post.return_value = response - with pytest.raises(AirflowHttpException): + with pytest.raises(RetryError): result = TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session") - assert mock_requests.post.call_count == 10 + assert mock_requests.post.call_count == 10 @conf_vars( { From 4343d318ef067eb75e8ca3b3abad9a1aed4daeef Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 11:28:52 +0200 Subject: [PATCH 09/12] fix static checks --- airflow/api_internal/internal_api_call.py | 2 +- tests/api_internal/test_internal_api_call.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index b40e4653cf862..03161cb64a585 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -115,7 +115,7 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: def _is_retryable_exception(exception: BaseException) -> bool: """ - Evaluates which exception types should be retried. + Evaluate which exception types to retry. This is especially demanded for cases where an application gateway or Kubernetes ingress can not find a running instance of a webserver hosting the API (HTTP 502+504) or when the diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 27e22b8c97e3d..134c1f8091ad2 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -21,14 +21,14 @@ import json from argparse import Namespace from typing import TYPE_CHECKING -from tenacity import RetryError from unittest import mock import pytest import requests +from tenacity import RetryError from airflow.__main__ import configure_internal_api -from airflow.api_internal.internal_api_call import AirflowHttpException, InternalApiConfig, internal_api_call +from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call from airflow.configuration import conf from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator From 47904947cdfb66a4006ea5c2ea0e2e4c5eeec5b5 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 12:08:52 +0200 Subject: [PATCH 10/12] fix some static checks --- tests/api_internal/test_internal_api_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 134c1f8091ad2..35b9bc5e20f53 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -283,7 +283,7 @@ def test_retry_on_bad_gateway(self, mock_sleep, mock_requests): response.status_code = 502 response.reason = "Bad Gateway" response._content = b"Bad Gateway" - + mock_sleep = lambda *_, **__: None mock_requests.post.return_value = response with pytest.raises(RetryError): From ec9fadb2fe551b146b8c9bd287a97af7905af45f Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 13:53:10 +0200 Subject: [PATCH 11/12] Fix ruff --- tests/api_internal/test_internal_api_call.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 35b9bc5e20f53..ff071300934a0 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -284,10 +284,10 @@ def test_retry_on_bad_gateway(self, mock_sleep, mock_requests): response.reason = "Bad Gateway" response._content = b"Bad Gateway" - mock_sleep = lambda *_, **__: None + mock_sleep = lambda *_, **__: None # noqa: F841 mock_requests.post.return_value = response with pytest.raises(RetryError): - result = TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session") + TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session") assert mock_requests.post.call_count == 10 @conf_vars( From f160ef2dce07eb39df48146f75e7360eb4ff66e4 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 15 Oct 2024 16:07:07 +0200 Subject: [PATCH 12/12] fix pre-commit --- airflow/api_internal/internal_api_call.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 03161cb64a585..064834d7c8673 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -40,6 +40,7 @@ logger = logging.getLogger(__name__) + class AirflowHttpException(AirflowException): """Raise when there is a problem during an http request on the internal API decorator.""" @@ -47,6 +48,7 @@ def __init__(self, message: str, status_code: HTTPStatus): super().__init__(message) self.status_code = status_code + class InternalApiConfig: """Stores and caches configuration for Internal API.""" @@ -110,27 +112,25 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: See [AIP-44](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API) for more information . """ - from requests.exceptions import ConnectionError def _is_retryable_exception(exception: BaseException) -> bool: """ Evaluate which exception types to retry. - + This is especially demanded for cases where an application gateway or Kubernetes ingress can not find a running instance of a webserver hosting the API (HTTP 502+504) or when the HTTP request fails in general on network level. - + Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. """ - retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) return ( isinstance(exception, AirflowHttpException) and exception.status_code in retryable_status_codes or isinstance(exception, (ConnectionError, NewConnectionError)) ) - + @tenacity.retry( stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_exponential(min=1),