Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _set_task_deferred_context_var():

def _fetch_logs_from_service(url, log_relative_path):
# Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438
import httpx
import requests

from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path):
expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
audience="task-instance-logs",
)
response = httpx.get(
response = requests.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
Expand Down Expand Up @@ -574,9 +574,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
from httpx import UnsupportedProtocol
from requests.exceptions import InvalidSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be

from requests.exceptions import InvalidURL

otherwise the fix on EmptyOperator's log msg doesn't work #35536

i'm going to open a new PR later if no one beats me to it


if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True:
messages.append(self.inherits_from_empty_operator_log_message)
else:
messages.append(f"Could not read served logs: {e}")
Expand Down
46 changes: 46 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging.config
import os
import re
from http import HTTPStatus
from importlib import reload
from pathlib import Path
from unittest import mock
Expand All @@ -29,6 +30,7 @@
import pendulum
import pytest
from kubernetes.client import models as k8s
from requests.adapters import Response

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
Expand All @@ -43,6 +45,7 @@
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
_fetch_logs_from_service,
_interleave_logs,
_parse_timestamps_in_log_file,
)
Expand Down Expand Up @@ -779,3 +782,46 @@ def test_permissions_for_new_directories(tmp_path):
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)


worker_url = "http://10.240.5.168:8793"
log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log"
log_url = f"{worker_url}/log/{log_location}"


@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "localhost")

response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response

_fetch_logs_from_service(log_url, log_location)

mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" in proxies.keys()
assert "no" in proxies.keys()


@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "10.0.0.0/8")

response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response

_fetch_logs_from_service(log_url, log_location)

mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" not in proxies.keys()
assert "no" not in proxies.keys()