From 7466f57811cf8f77adbad48e6c5a7cd63d52bb92 Mon Sep 17 00:00:00 2001 From: scott-py Date: Fri, 24 May 2024 13:02:32 +0900 Subject: [PATCH 1/3] Change httpx to requests in file_task_handler - httpx does not support CIDRs in NO_PROXY - simply, convert httpx to requests, issues done - related issue: https://github.com/apache/airflow/issues/39794 --- airflow/utils/log/file_task_handler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index a8a1fffebf089..24bad09aa3e55 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -86,7 +86,7 @@ def _set_task_deferred_context_var(): def _fetch_logs_from_service(url, log_relative_path): # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 - import httpx + import requests from airflow.utils.jwt_signer import JWTSigner @@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path): expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), audience="task-instance-logs", ) - response = httpx.get( + response = requests.get( url, timeout=timeout, headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, @@ -574,9 +574,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - from httpx import UnsupportedProtocol + from requests.exceptions import InvalidSchema - if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: + if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True: messages.append(self.inherits_from_empty_operator_log_message) else: messages.append(f"Could not read served logs: {e}") From 81c5633e6634e02d9c23d4241a65b4c1f2d8e31b Mon Sep 17 00:00:00 2001 From: scott-py Date: Fri, 24 May 2024 19:30:13 +0900 Subject: [PATCH 2/3] Add cidr no_proxy test test_log_handlers.py --- tests/utils/test_log_handlers.py | 65 ++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 101e62a06e0c4..6eb7732c3d04c 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,6 +21,7 @@ import logging.config import os import re +from http import HTTPStatus from importlib import reload from pathlib import Path from unittest import mock @@ -29,6 +30,7 @@ import pendulum import pytest from kubernetes.client import models as k8s +from requests.adapters import Response from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.exceptions import RemovedInAirflow3Warning @@ -43,6 +45,7 @@ from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, + _fetch_logs_from_service, _interleave_logs, _parse_timestamps_in_log_file, ) @@ -779,3 +782,65 @@ def test_permissions_for_new_directories(tmp_path): assert base_dir.stat().st_mode % 0o1000 == default_permissions finally: os.umask(old_umask) + + +worker_url = "http://10.240.5.168:8793" +log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log" +log_url = f"{worker_url}/log/{log_location}" + + +@pytest.fixture +def http_proxy(): + _origin_http_proxy = os.getenv("http_proxy") or "" + os.environ["http_proxy"] = "http://proxy.example.com" + yield + os.environ["http_proxy"] = _origin_http_proxy + + +@pytest.fixture +def no_proxy(): + _origin_no_proxy = os.getenv("no_proxy") or "" + + def _set_no_proxy(values): + os.environ["no_proxy"] = values + + yield _set_no_proxy + os.environ["no_proxy"] = _origin_no_proxy + + +@mock.patch("requests.adapters.HTTPAdapter.send") +@pytest.mark.usefixtures("http_proxy") +def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, no_proxy): + no_proxy("localhost") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" in proxies.keys() + assert "no" in proxies.keys() + + +@mock.patch("requests.adapters.HTTPAdapter.send") +@pytest.mark.usefixtures("http_proxy") +def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, no_proxy): + no_proxy("10.0.0.0/8") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" not in proxies.keys() + assert "no" not in proxies.keys() From c47b3f19e0ecd2434f4c8d706ccfd86e024a4f70 Mon Sep 17 00:00:00 2001 From: "scott.py" Date: Fri, 24 May 2024 23:25:05 +0900 Subject: [PATCH 3/3] Apply monkeypatch fixture --- tests/utils/test_log_handlers.py | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 6eb7732c3d04c..5c88fce0a6c13 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -789,29 +789,10 @@ def test_permissions_for_new_directories(tmp_path): log_url = f"{worker_url}/log/{log_location}" -@pytest.fixture -def http_proxy(): - _origin_http_proxy = os.getenv("http_proxy") or "" - os.environ["http_proxy"] = "http://proxy.example.com" - yield - os.environ["http_proxy"] = _origin_http_proxy - - -@pytest.fixture -def no_proxy(): - _origin_no_proxy = os.getenv("no_proxy") or "" - - def _set_no_proxy(values): - os.environ["no_proxy"] = values - - yield _set_no_proxy - os.environ["no_proxy"] = _origin_no_proxy - - @mock.patch("requests.adapters.HTTPAdapter.send") -@pytest.mark.usefixtures("http_proxy") -def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, no_proxy): - no_proxy("localhost") +def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "localhost") response = Response() response.status_code = HTTPStatus.OK @@ -828,9 +809,9 @@ def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, no_proxy): @mock.patch("requests.adapters.HTTPAdapter.send") -@pytest.mark.usefixtures("http_proxy") -def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, no_proxy): - no_proxy("10.0.0.0/8") +def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "10.0.0.0/8") response = Response() response.status_code = HTTPStatus.OK