From 6b778b4754e66c23155f73f3a0c5e6bee1b54f7e Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 22 Aug 2025 00:02:05 +0800 Subject: [PATCH 01/11] Add stream method to RemoteIO --- airflow-core/src/airflow/logging/remote.py | 8 ++++-- .../airflow/utils/log/file_task_handler.py | 13 +++++++--- .../alibaba/cloud/log/oss_task_handler.py | 5 +++- .../cloud/log/test_oss_task_handler.py | 24 +++++++++++++++++- .../amazon/aws/log/cloudwatch_task_handler.py | 5 +++- .../amazon/aws/log/s3_task_handler.py | 5 +++- .../aws/log/test_cloudwatch_task_handler.py | 5 ++++ .../amazon/aws/log/test_s3_task_handler.py | 5 ++++ .../apache/hdfs/log/hdfs_task_handler.py | 5 +++- .../apache/hdfs/log/test_hdfs_task_handler.py | 24 +++++++++++++++++- .../google/cloud/log/gcs_task_handler.py | 5 +++- .../google/cloud/log/test_gcs_task_handler.py | 25 ++++++++++++++++++- .../microsoft/azure/log/wasb_task_handler.py | 5 +++- .../azure/log/test_wasb_task_handler.py | 24 ++++++++++++++++++ 14 files changed, 144 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/logging/remote.py b/airflow-core/src/airflow/logging/remote.py index ec5ab70ab20db..ed00c15ac3464 100644 --- a/airflow-core/src/airflow/logging/remote.py +++ b/airflow-core/src/airflow/logging/remote.py @@ -24,7 +24,7 @@ import structlog.typing from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LegacyLogResponse, LogResponse class RemoteLogIO(Protocol): @@ -44,6 +44,10 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: """Upload the given log path to the remote storage.""" ... - def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]: + def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: """Read logs from the given remote log path.""" ... + + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + """Stream-based read interface for reading logs from the given remote log path.""" + ... diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 606c7369a1afa..9128d86225e93 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -73,7 +73,7 @@ """Information _about_ the log fetching process for display to a user""" RawLogStream: TypeAlias = Generator[str, None, None] """Raw log stream, containing unparsed log lines.""" -LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages] +LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] """Legacy log response, containing source information and log messages.""" LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int] @@ -936,5 +936,12 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. path = self._render_filename(ti, try_number) - sources, logs = remote_io.read(path, ti) - return sources, logs or [] + logs: LogMessages | list[RawLogStream] | None # extra typing to void mypy assignment error + try: + # Use .stream interface if provider's RemoteIO supports it + sources, logs = remote_io.stream(path, ti) + return sources, logs or [] + except (AttributeError, NotImplementedError): + # Fallback to .read interface + sources, logs = remote_io.read(path, ti) + return sources, logs or [] diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py index e69df54066520..2c06c327c514d 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -33,7 +33,7 @@ if TYPE_CHECKING: from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo @attrs.define(kw_only=True) @@ -95,6 +95,9 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages return messages, logs return messages, None + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + def oss_log_exists(self, remote_log_location): """ Check if remote_log_location exists in remote storage. diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index 56c65a7c7d718..621d1d153a808 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -24,7 +24,7 @@ import pytest -from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler # noqa: F401 +from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -45,6 +45,28 @@ MOCK_FILE_PATH = "mock_file_path" +class TestOSSRemoteLogIO: + @pytest.fixture(autouse=True) + def setup_tests(self, create_runtime_ti): + from airflow.sdk import BaseOperator + + # setup remote IO + self.base_log_folder = "local/airflow/logs" + self.oss_log_folder = f"oss://{MOCK_BUCKET_NAME}/airflow/logs" + self.oss_remote_log_io = OSSRemoteLogIO( + remote_base=self.oss_log_folder, + base_log_folder=self.base_log_folder, + delete_local_copy=True, + ) + # setup task instance + self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) + + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.oss_remote_log_io.stream("some/log/path", self.ti) + + class TestOSSTaskHandler: def setup_method(self): self.base_log_folder = "local/airflow/logs/1.log" diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 79d56e7b6ad39..29756516432b0 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -41,7 +41,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo def json_serialize_legacy(value: Any) -> str | None: @@ -176,6 +176,9 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages return messages, logs + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI): """ Return all logs from the given log stream. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index caf5fae0962c1..9ecdf6fe7f5d5 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo @attrs.define @@ -164,6 +164,9 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes return messages, logs return messages, None + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + class S3TaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 01eebfa0b1f4f..60ee2e2cc77df 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -177,6 +177,11 @@ def test_event_to_str(self): ] ) + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.subject.stream("some/log/path", self.ti) + @pytest.mark.db_test class TestCloudwatchTaskHandler: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 65f70ed1b75aa..cc9a041a38d77 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -193,6 +193,11 @@ def test_write_raises(self, caplog): assert rec.message == f"Could not write logs to {url}" assert rec.exc_info is not None + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.subject.stream("some/log/path", self.ti) + @pytest.mark.db_test class TestS3TaskHandler: diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py index ed76365ac456a..01099900b5728 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo @attrs.define(kw_only=True) @@ -76,6 +76,9 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes messages.append(f"No logs found on hdfs for ti={ti}") return messages, logs + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + class HdfsTaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py index b5e2adb757797..76afe4be86e35 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py @@ -25,7 +25,7 @@ import pytest from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook -from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler +from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO, HdfsTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -37,6 +37,28 @@ DEFAULT_DATE = datetime(2020, 8, 10) +class TestHdfsRemoteLogIO: + @pytest.fixture(autouse=True) + def setup_tests(self, create_runtime_ti): + from airflow.sdk import BaseOperator + + # setup remote IO + self.base_log_folder = "local/airflow/logs" + self.remote_base = "/remote/log/location" + self.hdfs_remote_log_io = HdfsRemoteLogIO( + remote_base=self.remote_base, + base_log_folder=self.base_log_folder, + delete_local_copy=True, + ) + # setup task instance + self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) + + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.hdfs_remote_log_io.stream("some/log/path", self.ti) + + class TestHdfsTaskHandler: @pytest.fixture(autouse=True) def ti(self, create_task_instance, create_log_template): diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index 5ad1e0c6d867f..d554085806811 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -45,7 +45,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo _DEFAULT_SCOPESS = frozenset( [ @@ -177,6 +177,9 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes messages.append(f"Unable to read remote log {e}") return messages, logs + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 135d443244559..fcb0758654f0e 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -24,7 +24,7 @@ import pytest -from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler +from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO, GCSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -33,6 +33,29 @@ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +@pytest.mark.db_test +class TestGCSRemoteLogIO: + @pytest.fixture(autouse=True) + def setup_tests(self, create_runtime_ti): + from airflow.sdk import BaseOperator + + # setup remote IO + self.base_log_folder = "local/airflow/logs" + self.gcs_log_folder = "gs://bucket/remote/log/location" + self.gcs_remote_log_io = GCSRemoteLogIO( + remote_base=self.gcs_log_folder, + base_log_folder=self.base_log_folder, + delete_local_copy=True, + ) + # setup task instance + self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) + + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.gcs_remote_log_io.stream("some/log/path", self.ti) + + @pytest.mark.db_test class TestGCSTaskHandler: @pytest.fixture(autouse=True) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py index a51625eb0396b..ebf8d25f9b010 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -36,7 +36,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo @attrs.define @@ -121,6 +121,9 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages self.log.exception("Could not read blob") return messages, logs + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + raise NotImplementedError + def wasb_log_exists(self, remote_log_location: str) -> bool: """ Check if remote_log_location exists in remote storage. diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index 4a09b28983375..caefb4b849a4d 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -41,6 +41,30 @@ DEFAULT_DATE = datetime(2020, 8, 10) +class TestWasbRemoteLogIO: + @pytest.fixture(autouse=True) + def setup_tests(self, create_runtime_ti): + from airflow.sdk import BaseOperator + + # setup remote IO + self.base_log_folder = "local/airflow/logs" + self.wasb_log_folder = "wasb://container@account.blob.core.windows.net/remote/log/location" + self.wasb_container = "container" + self.wasb_remote_log_io = WasbRemoteLogIO( + remote_base=self.wasb_log_folder, + base_log_folder=self.base_log_folder, + delete_local_copy=True, + wasb_container=self.wasb_container, + ) + # setup task instance + self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) + + def test_stream(self): + """Test that the stream method raises NotImplementedError.""" + with pytest.raises(NotImplementedError): + self.wasb_remote_log_io.stream("some/log/path", self.ti) + + class TestWasbTaskHandler: @pytest.fixture(autouse=True) def ti(self, create_task_instance, create_log_template): From c88d360008dbf835012c8d7a9788cd2f7cc47c7a Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 22 Aug 2025 17:29:11 +0800 Subject: [PATCH 02/11] Import BaseOperator from version_compact --- .../tests/unit/alibaba/cloud/log/test_oss_task_handler.py | 3 +-- .../hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py | 3 +-- .../tests/unit/google/cloud/log/test_gcs_task_handler.py | 3 +-- .../tests/unit/microsoft/azure/log/test_wasb_task_handler.py | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index 621d1d153a808..ea17879525d56 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -25,6 +25,7 @@ import pytest from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler +from airflow.providers.alibaba.version_compat import BaseOperator from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -48,8 +49,6 @@ class TestOSSRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): - from airflow.sdk import BaseOperator - # setup remote IO self.base_log_folder = "local/airflow/logs" self.oss_log_folder = f"oss://{MOCK_BUCKET_NAME}/airflow/logs" diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py index 76afe4be86e35..4ab75136ad398 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py @@ -26,6 +26,7 @@ from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO, HdfsTaskHandler +from airflow.providers.apache.hdfs.version_compat import BaseOperator from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -40,8 +41,6 @@ class TestHdfsRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): - from airflow.sdk import BaseOperator - # setup remote IO self.base_log_folder = "local/airflow/logs" self.remote_base = "/remote/log/location" diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index fcb0758654f0e..50177f228adc7 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -25,6 +25,7 @@ import pytest from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO, GCSTaskHandler +from airflow.providers.google.version_compat import BaseOperator from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -37,8 +38,6 @@ class TestGCSRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): - from airflow.sdk import BaseOperator - # setup remote IO self.base_log_folder = "local/airflow/logs" self.gcs_log_folder = "gs://bucket/remote/log/location" diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index caefb4b849a4d..d6ac1a9a23fd3 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -28,6 +28,7 @@ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO, WasbTaskHandler +from airflow.providers.microsoft.azure.version_compat import BaseOperator from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -44,8 +45,6 @@ class TestWasbRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): - from airflow.sdk import BaseOperator - # setup remote IO self.base_log_folder = "local/airflow/logs" self.wasb_log_folder = "wasb://container@account.blob.core.windows.net/remote/log/location" From 20fa155a05a0a888b8c81f83ee4b430fbb120358 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 22 Aug 2025 22:26:51 +0800 Subject: [PATCH 03/11] Skip Airflow 2 compact test for RemoteIO --- .../tests/unit/alibaba/cloud/log/test_oss_task_handler.py | 2 ++ .../hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py | 2 ++ .../google/tests/unit/google/cloud/log/test_gcs_task_handler.py | 2 +- .../tests/unit/microsoft/azure/log/test_wasb_task_handler.py | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index ea17879525d56..d07dbdf160892 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -31,6 +31,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from tests_common.pytest_plugin import CreateTaskInstance, DagMaker @@ -46,6 +47,7 @@ MOCK_FILE_PATH = "mock_file_path" +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") class TestOSSRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py index 4ab75136ad398..3bf2f6810f714 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py @@ -32,12 +32,14 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test DEFAULT_DATE = datetime(2020, 8, 10) +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") class TestHdfsRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 50177f228adc7..864bfa186e3ec 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -34,7 +34,7 @@ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS -@pytest.mark.db_test +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") class TestGCSRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index d6ac1a9a23fd3..3ad60d5b70b1a 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -42,6 +42,7 @@ DEFAULT_DATE = datetime(2020, 8, 10) +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") class TestWasbRemoteLogIO: @pytest.fixture(autouse=True) def setup_tests(self, create_runtime_ti): From e17b1d031aaa34dfa77e3924e09a18ef855def3f Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 4 Sep 2025 12:46:39 +0800 Subject: [PATCH 04/11] Add a new Protocal for .stream method, check attr in runtime --- .../config_templates/airflow_local_settings.py | 4 ++-- airflow-core/src/airflow/logging/remote.py | 7 ++++++- .../src/airflow/utils/log/file_task_handler.py | 12 +++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 6270fba7ba3ba..e658531fcde91 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException if TYPE_CHECKING: - from airflow.logging_config import RemoteLogIO + from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() @@ -119,7 +119,7 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") -REMOTE_TASK_LOG: RemoteLogIO | None = None +REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/logging/remote.py b/airflow-core/src/airflow/logging/remote.py index ed00c15ac3464..c4545bdbb939f 100644 --- a/airflow-core/src/airflow/logging/remote.py +++ b/airflow-core/src/airflow/logging/remote.py @@ -18,7 +18,7 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING, Protocol, runtime_checkable if TYPE_CHECKING: import structlog.typing @@ -48,6 +48,11 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: """Read logs from the given remote log path.""" ... + +@runtime_checkable +class RemoteLogStreamIO(Protocol): + """Interface for remote task loggers with stream-based read support.""" + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: """Stream-based read interface for reading logs from the given remote log path.""" ... diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 9128d86225e93..507dcbe673249 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -936,12 +936,10 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. path = self._render_filename(ti, try_number) - logs: LogMessages | list[RawLogStream] | None # extra typing to void mypy assignment error - try: + if stream_method := getattr(remote_io, "stream"): # Use .stream interface if provider's RemoteIO supports it - sources, logs = remote_io.stream(path, ti) - return sources, logs or [] - except (AttributeError, NotImplementedError): - # Fallback to .read interface - sources, logs = remote_io.read(path, ti) + sources, logs = stream_method(path, ti) return sources, logs or [] + # Fallback to .read interface + sources, logs = remote_io.read(path, ti) + return sources, logs or [] From 659acb23b8e5a910c74ba2af1b7b4b749b09b4e1 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 6 Sep 2025 01:34:56 +0800 Subject: [PATCH 05/11] Remove NotImplementedError --- .../airflow/providers/alibaba/cloud/log/oss_task_handler.py | 5 +---- .../tests/unit/alibaba/cloud/log/test_oss_task_handler.py | 5 ----- .../providers/amazon/aws/log/cloudwatch_task_handler.py | 5 +---- .../src/airflow/providers/amazon/aws/log/s3_task_handler.py | 5 +---- .../unit/amazon/aws/log/test_cloudwatch_task_handler.py | 5 ----- .../amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py | 5 ----- .../airflow/providers/apache/hdfs/log/hdfs_task_handler.py | 5 +---- .../tests/unit/apache/hdfs/log/test_hdfs_task_handler.py | 5 ----- .../airflow/providers/google/cloud/log/gcs_task_handler.py | 5 +---- .../tests/unit/google/cloud/log/test_gcs_task_handler.py | 5 ----- .../providers/microsoft/azure/log/wasb_task_handler.py | 5 +---- .../tests/unit/microsoft/azure/log/test_wasb_task_handler.py | 5 ----- 12 files changed, 6 insertions(+), 54 deletions(-) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py index 2c06c327c514d..e69df54066520 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -33,7 +33,7 @@ if TYPE_CHECKING: from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo @attrs.define(kw_only=True) @@ -95,9 +95,6 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages return messages, logs return messages, None - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - def oss_log_exists(self, remote_log_location): """ Check if remote_log_location exists in remote storage. diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index d07dbdf160892..1975ff01837a6 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -62,11 +62,6 @@ def setup_tests(self, create_runtime_ti): # setup task instance self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.oss_remote_log_io.stream("some/log/path", self.ti) - class TestOSSTaskHandler: def setup_method(self): diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 29756516432b0..79d56e7b6ad39 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -41,7 +41,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo def json_serialize_legacy(value: Any) -> str | None: @@ -176,9 +176,6 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages return messages, logs - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI): """ Return all logs from the given log stream. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index 9ecdf6fe7f5d5..caf5fae0962c1 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo @attrs.define @@ -164,9 +164,6 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes return messages, logs return messages, None - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - class S3TaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 60ee2e2cc77df..01eebfa0b1f4f 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -177,11 +177,6 @@ def test_event_to_str(self): ] ) - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.subject.stream("some/log/path", self.ti) - @pytest.mark.db_test class TestCloudwatchTaskHandler: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index cc9a041a38d77..65f70ed1b75aa 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -193,11 +193,6 @@ def test_write_raises(self, caplog): assert rec.message == f"Could not write logs to {url}" assert rec.exc_info is not None - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.subject.stream("some/log/path", self.ti) - @pytest.mark.db_test class TestS3TaskHandler: diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py index 01099900b5728..ed76365ac456a 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo @attrs.define(kw_only=True) @@ -76,9 +76,6 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes messages.append(f"No logs found on hdfs for ti={ti}") return messages, logs - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - class HdfsTaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py index 3bf2f6810f714..59e76efa4b398 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py @@ -54,11 +54,6 @@ def setup_tests(self, create_runtime_ti): # setup task instance self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.hdfs_remote_log_io.stream("some/log/path", self.ti) - class TestHdfsTaskHandler: @pytest.fixture(autouse=True) diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index d554085806811..5ad1e0c6d867f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -45,7 +45,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo _DEFAULT_SCOPESS = frozenset( [ @@ -177,9 +177,6 @@ def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMes messages.append(f"Unable to read remote log {e}") return messages, logs - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 864bfa186e3ec..7cfa1c1bc5bd6 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -49,11 +49,6 @@ def setup_tests(self, create_runtime_ti): # setup task instance self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.gcs_remote_log_io.stream("some/log/path", self.ti) - @pytest.mark.db_test class TestGCSTaskHandler: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py index ebf8d25f9b010..a51625eb0396b 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -36,7 +36,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogResponse, LogSourceInfo + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo @attrs.define @@ -121,9 +121,6 @@ def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages self.log.exception("Could not read blob") return messages, logs - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: - raise NotImplementedError - def wasb_log_exists(self, remote_log_location: str) -> bool: """ Check if remote_log_location exists in remote storage. diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index 3ad60d5b70b1a..d98504af155f4 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -59,11 +59,6 @@ def setup_tests(self, create_runtime_ti): # setup task instance self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - def test_stream(self): - """Test that the stream method raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - self.wasb_remote_log_io.stream("some/log/path", self.ti) - class TestWasbTaskHandler: @pytest.fixture(autouse=True) From 62a8e7ec9db37e48a550169f206d8e858db12a5d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 7 Sep 2025 00:00:10 +0800 Subject: [PATCH 06/11] Clearify naming for log response types --- airflow-core/src/airflow/logging/remote.py | 6 +++--- .../src/airflow/utils/log/file_task_handler.py | 13 ++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/logging/remote.py b/airflow-core/src/airflow/logging/remote.py index c4545bdbb939f..1c9e92f5f7172 100644 --- a/airflow-core/src/airflow/logging/remote.py +++ b/airflow-core/src/airflow/logging/remote.py @@ -24,7 +24,7 @@ import structlog.typing from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LegacyLogResponse, LogResponse + from airflow.utils.log.file_task_handler import LogResponse, StreamingLogResponse class RemoteLogIO(Protocol): @@ -44,7 +44,7 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: """Upload the given log path to the remote storage.""" ... - def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: + def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: """Read logs from the given remote log path.""" ... @@ -53,6 +53,6 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: class RemoteLogStreamIO(Protocol): """Interface for remote task loggers with stream-based read support.""" - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: """Stream-based read interface for reading logs from the given remote log path.""" ... diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 507dcbe673249..70a1af3e8ad96 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -73,11 +73,10 @@ """Information _about_ the log fetching process for display to a user""" RawLogStream: TypeAlias = Generator[str, None, None] """Raw log stream, containing unparsed log lines.""" -LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] +LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] """Legacy log response, containing source information and log messages.""" -LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] -LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int] -"""Log response, containing source information, stream of log lines, and total log size.""" +StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] +"""Streaming log response, containing source information, stream of log lines.""" StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None] """Structured log stream, containing structured log messages.""" LogHandlerOutputStream: TypeAlias = ( @@ -856,7 +855,7 @@ def _init_file(self, ti, *, identifier: str | None = None): @staticmethod def _read_from_local( worker_log_path: Path, - ) -> LogResponse: + ) -> StreamingLogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) @@ -873,7 +872,7 @@ def _read_from_logs_server( self, ti: TaskInstance | TaskInstanceHistory, worker_log_rel_path: str, - ) -> LogResponse: + ) -> StreamingLogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] try: @@ -911,7 +910,7 @@ def _read_from_logs_server( logger.exception("Could not read served logs") return sources, log_streams - def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse | LogResponse: + def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse: """ Implement in subclasses to read from the remote service. From 5cb2745b2ff02aeac129c6264e94afd114ed6adb Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 8 Sep 2025 18:27:26 +0800 Subject: [PATCH 07/11] Fix comment for LogMessages --- airflow-core/src/airflow/utils/log/file_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 70a1af3e8ad96..d155a34706903 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -68,7 +68,7 @@ # These types are similar, but have distinct names to make processing them less error prone LogMessages: TypeAlias = list[str] -"""The legacy format of log messages before 3.0.2""" +"""The legacy format of log messages before 3.0.4""" LogSourceInfo: TypeAlias = list[str] """Information _about_ the log fetching process for display to a user""" RawLogStream: TypeAlias = Generator[str, None, None] From 6b8c4ce9b1e2c0c21a242d30e3591f5f90afb1f5 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 15 Sep 2025 09:17:30 -0500 Subject: [PATCH 08/11] Try to make mypy happy --- airflow-core/src/airflow/logging/remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/logging/remote.py b/airflow-core/src/airflow/logging/remote.py index 1c9e92f5f7172..5f60fc6ffff43 100644 --- a/airflow-core/src/airflow/logging/remote.py +++ b/airflow-core/src/airflow/logging/remote.py @@ -50,7 +50,7 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: @runtime_checkable -class RemoteLogStreamIO(Protocol): +class RemoteLogStreamIO(RemoteLogIO, Protocol): """Interface for remote task loggers with stream-based read support.""" def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: From d72415135eb69931c20167ff03a4c2884d51ca3d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 17 Sep 2025 14:02:12 +0800 Subject: [PATCH 09/11] Remove redundant tests --- .../cloud/log/test_oss_task_handler.py | 20 +------------------ .../apache/hdfs/log/test_hdfs_task_handler.py | 20 +------------------ .../google/cloud/log/test_gcs_task_handler.py | 19 +----------------- .../azure/log/test_wasb_task_handler.py | 19 ------------------ 4 files changed, 3 insertions(+), 75 deletions(-) diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index 1975ff01837a6..1fcc9be4f8962 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -24,14 +24,12 @@ import pytest -from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler -from airflow.providers.alibaba.version_compat import BaseOperator +from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from tests_common.pytest_plugin import CreateTaskInstance, DagMaker @@ -47,22 +45,6 @@ MOCK_FILE_PATH = "mock_file_path" -@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") -class TestOSSRemoteLogIO: - @pytest.fixture(autouse=True) - def setup_tests(self, create_runtime_ti): - # setup remote IO - self.base_log_folder = "local/airflow/logs" - self.oss_log_folder = f"oss://{MOCK_BUCKET_NAME}/airflow/logs" - self.oss_remote_log_io = OSSRemoteLogIO( - remote_base=self.oss_log_folder, - base_log_folder=self.base_log_folder, - delete_local_copy=True, - ) - # setup task instance - self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - - class TestOSSTaskHandler: def setup_method(self): self.base_log_folder = "local/airflow/logs/1.log" diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py index 59e76efa4b398..b5e2adb757797 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/log/test_hdfs_task_handler.py @@ -25,36 +25,18 @@ import pytest from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook -from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO, HdfsTaskHandler -from airflow.providers.apache.hdfs.version_compat import BaseOperator +from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test DEFAULT_DATE = datetime(2020, 8, 10) -@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") -class TestHdfsRemoteLogIO: - @pytest.fixture(autouse=True) - def setup_tests(self, create_runtime_ti): - # setup remote IO - self.base_log_folder = "local/airflow/logs" - self.remote_base = "/remote/log/location" - self.hdfs_remote_log_io = HdfsRemoteLogIO( - remote_base=self.remote_base, - base_log_folder=self.base_log_folder, - delete_local_copy=True, - ) - # setup task instance - self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - - class TestHdfsTaskHandler: @pytest.fixture(autouse=True) def ti(self, create_task_instance, create_log_template): diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 7cfa1c1bc5bd6..135d443244559 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -24,8 +24,7 @@ import pytest -from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO, GCSTaskHandler -from airflow.providers.google.version_compat import BaseOperator +from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -34,22 +33,6 @@ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS -@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") -class TestGCSRemoteLogIO: - @pytest.fixture(autouse=True) - def setup_tests(self, create_runtime_ti): - # setup remote IO - self.base_log_folder = "local/airflow/logs" - self.gcs_log_folder = "gs://bucket/remote/log/location" - self.gcs_remote_log_io = GCSRemoteLogIO( - remote_base=self.gcs_log_folder, - base_log_folder=self.base_log_folder, - delete_local_copy=True, - ) - # setup task instance - self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - - @pytest.mark.db_test class TestGCSTaskHandler: @pytest.fixture(autouse=True) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index d98504af155f4..4a09b28983375 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -28,7 +28,6 @@ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO, WasbTaskHandler -from airflow.providers.microsoft.azure.version_compat import BaseOperator from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime @@ -42,24 +41,6 @@ DEFAULT_DATE = datetime(2020, 8, 10) -@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="This path only works on Airflow 3") -class TestWasbRemoteLogIO: - @pytest.fixture(autouse=True) - def setup_tests(self, create_runtime_ti): - # setup remote IO - self.base_log_folder = "local/airflow/logs" - self.wasb_log_folder = "wasb://container@account.blob.core.windows.net/remote/log/location" - self.wasb_container = "container" - self.wasb_remote_log_io = WasbRemoteLogIO( - remote_base=self.wasb_log_folder, - base_log_folder=self.base_log_folder, - delete_local_copy=True, - wasb_container=self.wasb_container, - ) - # setup task instance - self.ti = create_runtime_ti(BaseOperator(task_id="task_1")) - - class TestWasbTaskHandler: @pytest.fixture(autouse=True) def ti(self, create_task_instance, create_log_template): From 25219c457567485e2b5b71dd467597d949e1c1c7 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 23 Oct 2025 10:34:15 +0800 Subject: [PATCH 10/11] Check hasattr before getattr --- airflow-core/src/airflow/utils/log/file_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index d155a34706903..4697fcb002b32 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -935,7 +935,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | Stre # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. path = self._render_filename(ti, try_number) - if stream_method := getattr(remote_io, "stream"): + if hasattr(remote_io, "stream") and (stream_method := getattr(remote_io, "stream")): # Use .stream interface if provider's RemoteIO supports it sources, logs = stream_method(path, ti) return sources, logs or [] From 00fcf78f418e8b9ccc2ab039198bc14aa514c718 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 10 Nov 2025 19:49:35 +0800 Subject: [PATCH 11/11] Fix nits for using getattr --- airflow-core/src/airflow/utils/log/file_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 4697fcb002b32..44ccf2cde3803 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -935,7 +935,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | Stre # This living here is not really a good plan, but it just about works for now. # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. path = self._render_filename(ti, try_number) - if hasattr(remote_io, "stream") and (stream_method := getattr(remote_io, "stream")): + if stream_method := getattr(remote_io, "stream", None): # Use .stream interface if provider's RemoteIO supports it sources, logs = stream_method(path, ti) return sources, logs or []