diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 6270fba7ba3ba..e658531fcde91 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException if TYPE_CHECKING: - from airflow.logging_config import RemoteLogIO + from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() @@ -119,7 +119,7 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") -REMOTE_TASK_LOG: RemoteLogIO | None = None +REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/logging/remote.py b/airflow-core/src/airflow/logging/remote.py index ec5ab70ab20db..5f60fc6ffff43 100644 --- a/airflow-core/src/airflow/logging/remote.py +++ b/airflow-core/src/airflow/logging/remote.py @@ -18,13 +18,13 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING, Protocol, runtime_checkable if TYPE_CHECKING: import structlog.typing from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogResponse, StreamingLogResponse class RemoteLogIO(Protocol): @@ -44,6 +44,15 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: """Upload the given log path to the remote storage.""" ... - def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]: + def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: """Read logs from the given remote log path.""" ... + + +@runtime_checkable +class RemoteLogStreamIO(RemoteLogIO, Protocol): + """Interface for remote task loggers with stream-based read support.""" + + def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: + """Stream-based read interface for reading logs from the given remote log path.""" + ... diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 606c7369a1afa..44ccf2cde3803 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -68,16 +68,15 @@ # These types are similar, but have distinct names to make processing them less error prone LogMessages: TypeAlias = list[str] -"""The legacy format of log messages before 3.0.2""" +"""The legacy format of log messages before 3.0.4""" LogSourceInfo: TypeAlias = list[str] """Information _about_ the log fetching process for display to a user""" RawLogStream: TypeAlias = Generator[str, None, None] """Raw log stream, containing unparsed log lines.""" -LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages] +LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] """Legacy log response, containing source information and log messages.""" -LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] -LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int] -"""Log response, containing source information, stream of log lines, and total log size.""" +StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] +"""Streaming log response, containing source information, stream of log lines.""" StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None] """Structured log stream, containing structured log messages.""" LogHandlerOutputStream: TypeAlias = ( @@ -856,7 +855,7 @@ def _init_file(self, ti, *, identifier: str | None = None): @staticmethod def _read_from_local( worker_log_path: Path, - ) -> LogResponse: + ) -> StreamingLogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) @@ -873,7 +872,7 @@ def _read_from_logs_server( self, ti: TaskInstance | TaskInstanceHistory, worker_log_rel_path: str, - ) -> LogResponse: + ) -> StreamingLogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] try: @@ -911,7 +910,7 @@ def _read_from_logs_server( logger.exception("Could not read served logs") return sources, log_streams - def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse | LogResponse: + def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse: """ Implement in subclasses to read from the remote service. @@ -936,5 +935,10 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. path = self._render_filename(ti, try_number) + if stream_method := getattr(remote_io, "stream", None): + # Use .stream interface if provider's RemoteIO supports it + sources, logs = stream_method(path, ti) + return sources, logs or [] + # Fallback to .read interface sources, logs = remote_io.read(path, ti) return sources, logs or [] diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index 56c65a7c7d718..1fcc9be4f8962 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -24,7 +24,7 @@ import pytest -from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler # noqa: F401 +from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime