From 5047c6bec86cdd161c3f43141e041b1b4c234a9a Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 1 Dec 2024 16:42:11 +0100 Subject: [PATCH 1/6] Make Edge API retries configurable --- providers/src/airflow/providers/edge/CHANGELOG.rst | 8 ++++++++ providers/src/airflow/providers/edge/__init__.py | 2 +- .../src/airflow/providers/edge/cli/api_client.py | 14 ++++++++++++-- providers/src/airflow/providers/edge/provider.yaml | 2 +- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst b/providers/src/airflow/providers/edge/CHANGELOG.rst index af4df15918cd3..18a1bc8db65d1 100644 --- a/providers/src/airflow/providers/edge/CHANGELOG.rst +++ b/providers/src/airflow/providers/edge/CHANGELOG.rst @@ -27,6 +27,14 @@ Changelog --------- +0.9.7pre0 +......... + +* ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.`` + +Misc +~~~~ + 0.9.6pre0 ......... diff --git a/providers/src/airflow/providers/edge/__init__.py b/providers/src/airflow/providers/edge/__init__.py index 7c0490c20785e..9c2324041e8a0 100644 --- a/providers/src/airflow/providers/edge/__init__.py +++ b/providers/src/airflow/providers/edge/__init__.py @@ -29,7 +29,7 @@ __all__ = ["__version__"] -__version__ = "0.9.6pre0" +__version__ = "0.9.7pre0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( "2.10.0" diff --git a/providers/src/airflow/providers/edge/cli/api_client.py b/providers/src/airflow/providers/edge/cli/api_client.py index 483c5ab3759e5..50f5171615de8 100644 --- a/providers/src/airflow/providers/edge/cli/api_client.py +++ b/providers/src/airflow/providers/edge/cli/api_client.py @@ -18,6 +18,7 @@ import json import logging +import os from datetime import datetime from http import HTTPStatus from pathlib import Path @@ -47,6 +48,13 @@ logger = logging.getLogger(__name__) +# Hidden config options for Edge Worker how retries on HTTP requests should be handled +# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min +AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10)) +AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1)) +AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90)) + + def _is_retryable_exception(exception: BaseException) -> bool: """ Evaluate which exception types to retry. @@ -66,8 +74,10 @@ def _is_retryable_exception(exception: BaseException) -> bool: @tenacity.retry( - stop=tenacity.stop_after_attempt(10), # TODO: Make this configurable - wait=tenacity.wait_exponential(min=1), # TODO: Make this configurable + stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES), + wait=tenacity.wait_exponential( + min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX + ), retry=tenacity.retry_if_exception(_is_retryable_exception), before_sleep=tenacity.before_log(logger, logging.WARNING), ) diff --git a/providers/src/airflow/providers/edge/provider.yaml b/providers/src/airflow/providers/edge/provider.yaml index f6b0457c07d7e..aa48ce77d5c35 100644 --- a/providers/src/airflow/providers/edge/provider.yaml +++ b/providers/src/airflow/providers/edge/provider.yaml @@ -27,7 +27,7 @@ source-date-epoch: 1729683247 # note that those versions are maintained by release manager - do not update them manually versions: - - 0.9.6pre0 + - 0.9.7pre0 dependencies: - apache-airflow>=2.10.0 From 687cb94b8d4c81a806e5edc82def4372eba441c9 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 28 Dec 2024 15:37:12 +0100 Subject: [PATCH 2/6] Align implementation with TaskSDK PR #45121 --- generated/provider_dependencies.json | 3 +- .../airflow/providers/edge/cli/api_client.py | 53 ++++++++----------- .../src/airflow/providers/edge/provider.yaml | 1 + 3 files changed, 25 insertions(+), 32 deletions(-) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index f58c010b3f58e..d2f9a81d7441b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -528,7 +528,8 @@ "edge": { "deps": [ "apache-airflow>=2.10.0", - "pydantic>=2.10.2" + "pydantic>=2.10.2", + "retryhttp>=1.2.0" ], "devel-deps": [], "plugins": [ diff --git a/providers/src/airflow/providers/edge/cli/api_client.py b/providers/src/airflow/providers/edge/cli/api_client.py index 50f5171615de8..174e9d9216bfd 100644 --- a/providers/src/airflow/providers/edge/cli/api_client.py +++ b/providers/src/airflow/providers/edge/cli/api_client.py @@ -26,9 +26,8 @@ from urllib.parse import quote, urljoin import requests -import tenacity -from requests.exceptions import ConnectionError -from urllib3.exceptions import NewConnectionError +from retryhttp import retry, wait_retry_after +from tenacity import before_log, wait_random_exponential from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -50,36 +49,28 @@ # Hidden config options for Edge Worker how retries on HTTP requests should be handled # Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min -AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10)) -AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1)) -AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90)) - - -def _is_retryable_exception(exception: BaseException) -> bool: - """ - Evaluate which exception types to retry. - - This is especially demanded for cases where an application gateway or Kubernetes ingress can - not find a running instance of a webserver hosting the API (HTTP 502+504) or when the - HTTP request fails in general on network level. - - Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. - """ - retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) - return ( - isinstance(exception, AirflowException) - and exception.status_code in retryable_status_codes - or isinstance(exception, (ConnectionError, NewConnectionError)) - ) +# So far there is no other config facility in Task SDK we use ENV for the moment +# TODO: Consider these env variables jointly in task sdk together with task_sdk/src/airflow/sdk/api/client.py +API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10))) +API_RETRY_WAIT_MIN = float( + os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0)) +) +API_RETRY_WAIT_MAX = float( + os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0)) +) + + +_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX) -@tenacity.retry( - stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES), - wait=tenacity.wait_exponential( - min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX - ), - retry=tenacity.retry_if_exception(_is_retryable_exception), - before_sleep=tenacity.before_log(logger, logging.WARNING), +@retry( + reraise=True, + max_attempt_number=API_RETRIES, + wait_server_errors=_default_wait, + wait_network_errors=_default_wait, + wait_timeouts=_default_wait, + wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429 + before_sleep=before_log(logger, logging.WARNING), ) def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any: signer = jwt_signer() diff --git a/providers/src/airflow/providers/edge/provider.yaml b/providers/src/airflow/providers/edge/provider.yaml index aa48ce77d5c35..6628aceab5786 100644 --- a/providers/src/airflow/providers/edge/provider.yaml +++ b/providers/src/airflow/providers/edge/provider.yaml @@ -32,6 +32,7 @@ versions: dependencies: - apache-airflow>=2.10.0 - pydantic>=2.10.2 + - retryhttp>=1.2.0 plugins: - name: edge_executor From 3b236a9b89c4759f90e9474c733524a93a086665 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 4 Jan 2025 14:06:43 +0100 Subject: [PATCH 3/6] Add pytests and fix retry handling --- .../airflow/providers/edge/cli/api_client.py | 8 +- .../providers/edge/cli/edge_command.py | 7 +- providers/tests/edge/cli/test_api_client.py | 88 +++++++++++++++++++ providers/tests/edge/cli/test_edge_command.py | 16 +++- 4 files changed, 105 insertions(+), 14 deletions(-) create mode 100644 providers/tests/edge/cli/test_api_client.py diff --git a/providers/src/airflow/providers/edge/cli/api_client.py b/providers/src/airflow/providers/edge/cli/api_client.py index 174e9d9216bfd..75fb821d259d6 100644 --- a/providers/src/airflow/providers/edge/cli/api_client.py +++ b/providers/src/airflow/providers/edge/cli/api_client.py @@ -30,7 +30,6 @@ from tenacity import before_log, wait_random_exponential from airflow.configuration import conf -from airflow.exceptions import AirflowException from airflow.providers.edge.worker_api.auth import jwt_signer from airflow.providers.edge.worker_api.datamodels import ( EdgeJobFetched, @@ -82,14 +81,9 @@ def _make_generic_request(method: str, rest_path: str, data: str | None = None) } api_endpoint = urljoin(api_url, rest_path) response = requests.request(method, url=api_endpoint, data=data, headers=headers) + response.raise_for_status() if response.status_code == HTTPStatus.NO_CONTENT: return None - if response.status_code != HTTPStatus.OK: - raise AirflowException( - f"Got {response.status_code}:{response.reason} when sending " - f"the internal api request: {response.text}", - HTTPStatus(response.status_code), - ) return json.loads(response.content) diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py b/providers/src/airflow/providers/edge/cli/edge_command.py index 115923e981fb7..6835e7909d2b4 100644 --- a/providers/src/airflow/providers/edge/cli/edge_command.py +++ b/providers/src/airflow/providers/edge/cli/edge_command.py @@ -23,6 +23,7 @@ import sys from dataclasses import dataclass from datetime import datetime +from http import HTTPStatus from pathlib import Path from subprocess import Popen from time import sleep @@ -30,11 +31,11 @@ import psutil from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile, write_pid_to_pidfile +from requests import HTTPError from airflow import __version__ as airflow_version, settings from airflow.cli.cli_config import ARG_PID, ARG_VERBOSE, ActionCommand, Arg from airflow.configuration import conf -from airflow.exceptions import AirflowException from airflow.providers.edge import __version__ as edge_provider_version from airflow.providers.edge.cli.api_client import ( jobs_fetch, @@ -199,8 +200,8 @@ def start(self): except EdgeWorkerVersionException as e: logger.info("Version mismatch of Edge worker and Core. Shutting down worker.") raise SystemExit(str(e)) - except AirflowException as e: - if "404:NOT FOUND" in str(e): + except HTTPError as e: + if e.response.status_code == HTTPStatus.NOT_FOUND: raise SystemExit("Error: API endpoint is not ready, please set [edge] api_enabled=True.") raise SystemExit(str(e)) _write_pid_to_pidfile(self.pid_file_path) diff --git a/providers/tests/edge/cli/test_api_client.py b/providers/tests/edge/cli/test_api_client.py new file mode 100644 index 0000000000000..e77b157b302d5 --- /dev/null +++ b/providers/tests/edge/cli/test_api_client.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from http import HTTPStatus +from typing import TYPE_CHECKING +from unittest.mock import patch + +import pytest +from requests import HTTPError +from requests.exceptions import ConnectTimeout +from requests_mock import ANY + +from airflow.providers.edge.cli.api_client import _make_generic_request + +from tests_common.test_utils.config import conf_vars + +if TYPE_CHECKING: + from requests_mock import Mocker as RequestsMocker + +MOCK_ENDPOINT = "https://invalid-api-test-endpoint" + + +class TestApiClient: + @conf_vars({("edge", "api_url"): MOCK_ENDPOINT}) + def test_make_generic_request_success(self, requests_mock: RequestsMocker): + requests_mock.get( + ANY, + [ + {"json": {"test": "ok"}}, + {"status_code": HTTPStatus.NO_CONTENT}, + ], + ) + + result1 = _make_generic_request("GET", f"{MOCK_ENDPOINT}/dummy_service", "test") + result2 = _make_generic_request("GET", f"{MOCK_ENDPOINT}/service_no_content", "test") + + assert result1 == {"test": "ok"} + assert result2 is None + assert requests_mock.call_count == 2 + + @patch("time.sleep", return_value=None) + @conf_vars({("edge", "api_url"): MOCK_ENDPOINT}) + def test_make_generic_request_retry(self, mock_sleep, requests_mock: RequestsMocker): + requests_mock.get( + ANY, + [ + *[{"status_code": HTTPStatus.SERVICE_UNAVAILABLE}] * 3, + {"exc": ConnectTimeout}, + {"json": {"test": 42}}, + ], + ) + + result = _make_generic_request("GET", f"{MOCK_ENDPOINT}/flaky_service", "test") + + assert result == {"test": 42} + assert requests_mock.call_count == 5 + + @patch("time.sleep", return_value=None) + @conf_vars({("edge", "api_url"): MOCK_ENDPOINT}) + def test_make_generic_request_unrecoverable_error(self, mock_sleep, requests_mock: RequestsMocker): + requests_mock.get( + ANY, + [ + *[{"status_code": HTTPStatus.INTERNAL_SERVER_ERROR}] * 11, + {"json": {"test": "uups"}}, + ], + ) + + with pytest.raises(HTTPError) as err: + _make_generic_request("GET", f"{MOCK_ENDPOINT}/broken_service", "test") + + assert err.value.response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert requests_mock.call_count == 10 diff --git a/providers/tests/edge/cli/test_edge_command.py b/providers/tests/edge/cli/test_edge_command.py index 123b06af3f9c4..9a9fe34660dbd 100644 --- a/providers/tests/edge/cli/test_edge_command.py +++ b/providers/tests/edge/cli/test_edge_command.py @@ -25,8 +25,8 @@ import pytest import time_machine +from requests import HTTPError -from airflow.exceptions import AirflowException from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _Job, _write_pid_to_pidfile from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException from airflow.providers.edge.worker_api.datamodels import EdgeJobFetched @@ -282,15 +282,23 @@ def test_version_mismatch(self, mock_set_state, worker_with_job): @patch("airflow.providers.edge.cli.edge_command.worker_register") def test_start_missing_apiserver(self, mock_register_worker, worker_with_job: _EdgeWorkerCli): - mock_register_worker.side_effect = AirflowException( - "Something with 404:NOT FOUND means API is not active" + class MockResponse: + status_code = 404 + + mock_register_worker.side_effect = HTTPError( + "Something with 404:NOT FOUND means API is not active", response=MockResponse() ) with pytest.raises(SystemExit, match=r"API endpoint is not ready"): worker_with_job.start() @patch("airflow.providers.edge.cli.edge_command.worker_register") def test_start_server_error(self, mock_register_worker, worker_with_job: _EdgeWorkerCli): - mock_register_worker.side_effect = AirflowException("Something other error not FourhundretFour") + class MockResponse: + status_code = 500 + + mock_register_worker.side_effect = HTTPError( + "Something other error not FourhundretFour", response=MockResponse() + ) with pytest.raises(SystemExit, match=r"Something other"): worker_with_job.start() From 6ff5407038ef5c3be45cef131a2df2e1f94dfb87 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 4 Jan 2025 14:43:25 +0100 Subject: [PATCH 4/6] Fix mypy on pytests --- providers/tests/edge/cli/test_edge_command.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/providers/tests/edge/cli/test_edge_command.py b/providers/tests/edge/cli/test_edge_command.py index 9a9fe34660dbd..df0cd7c81407f 100644 --- a/providers/tests/edge/cli/test_edge_command.py +++ b/providers/tests/edge/cli/test_edge_command.py @@ -25,7 +25,7 @@ import pytest import time_machine -from requests import HTTPError +from requests import HTTPError, Response from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _Job, _write_pid_to_pidfile from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException @@ -282,22 +282,20 @@ def test_version_mismatch(self, mock_set_state, worker_with_job): @patch("airflow.providers.edge.cli.edge_command.worker_register") def test_start_missing_apiserver(self, mock_register_worker, worker_with_job: _EdgeWorkerCli): - class MockResponse: - status_code = 404 - + mock_response = Response() + mock_response.status_code = 404 mock_register_worker.side_effect = HTTPError( - "Something with 404:NOT FOUND means API is not active", response=MockResponse() + "Something with 404:NOT FOUND means API is not active", response=mock_response ) with pytest.raises(SystemExit, match=r"API endpoint is not ready"): worker_with_job.start() @patch("airflow.providers.edge.cli.edge_command.worker_register") def test_start_server_error(self, mock_register_worker, worker_with_job: _EdgeWorkerCli): - class MockResponse: - status_code = 500 - + mock_response = Response() + mock_response.status_code = 500 mock_register_worker.side_effect = HTTPError( - "Something other error not FourhundretFour", response=MockResponse() + "Something other error not FourhundretFour", response=mock_response ) with pytest.raises(SystemExit, match=r"Something other"): worker_with_job.start() From 18fbfd0cc249fc366608597deab9730e6b30fe25 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 4 Jan 2025 15:49:15 +0100 Subject: [PATCH 5/6] Update changelog --- providers/src/airflow/providers/edge/CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst b/providers/src/airflow/providers/edge/CHANGELOG.rst index 18a1bc8db65d1..c4e152aaa69d9 100644 --- a/providers/src/airflow/providers/edge/CHANGELOG.rst +++ b/providers/src/airflow/providers/edge/CHANGELOG.rst @@ -31,6 +31,7 @@ Changelog ......... * ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.`` +* ``Align retry handling logic and tooling with Task SDK.`` Misc ~~~~ From e302bb42cc6524e4892c4945c73842873872448e Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 4 Jan 2025 15:49:51 +0100 Subject: [PATCH 6/6] Update changelog --- providers/src/airflow/providers/edge/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst b/providers/src/airflow/providers/edge/CHANGELOG.rst index c4e152aaa69d9..2c120ed652e82 100644 --- a/providers/src/airflow/providers/edge/CHANGELOG.rst +++ b/providers/src/airflow/providers/edge/CHANGELOG.rst @@ -31,7 +31,7 @@ Changelog ......... * ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.`` -* ``Align retry handling logic and tooling with Task SDK.`` +* ``Align retry handling logic and tooling with Task SDK, via retryhttp.`` Misc ~~~~