diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index a8a1fffebf089..24bad09aa3e55 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -86,7 +86,7 @@ def _set_task_deferred_context_var(): def _fetch_logs_from_service(url, log_relative_path): # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 - import httpx + import requests from airflow.utils.jwt_signer import JWTSigner @@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path): expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), audience="task-instance-logs", ) - response = httpx.get( + response = requests.get( url, timeout=timeout, headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, @@ -574,9 +574,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - from httpx import UnsupportedProtocol + from requests.exceptions import InvalidSchema - if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: + if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True: messages.append(self.inherits_from_empty_operator_log_message) else: messages.append(f"Could not read served logs: {e}") diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 101e62a06e0c4..5c88fce0a6c13 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,6 +21,7 @@ import logging.config import os import re +from http import HTTPStatus from importlib import reload from pathlib import Path from unittest import mock @@ -29,6 +30,7 @@ import pendulum import pytest from kubernetes.client import models as k8s +from requests.adapters import Response from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.exceptions import RemovedInAirflow3Warning @@ -43,6 +45,7 @@ from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, + _fetch_logs_from_service, _interleave_logs, _parse_timestamps_in_log_file, ) @@ -779,3 +782,46 @@ def test_permissions_for_new_directories(tmp_path): assert base_dir.stat().st_mode % 0o1000 == default_permissions finally: os.umask(old_umask) + + +worker_url = "http://10.240.5.168:8793" +log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log" +log_url = f"{worker_url}/log/{log_location}" + + +@mock.patch("requests.adapters.HTTPAdapter.send") +def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "localhost") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" in proxies.keys() + assert "no" in proxies.keys() + + +@mock.patch("requests.adapters.HTTPAdapter.send") +def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "10.0.0.0/8") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" not in proxies.keys() + assert "no" not in proxies.keys()