From e8d5b5b32bb222f3bb9491e649c38a6553319370 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 31 Jul 2023 21:50:09 +0530 Subject: [PATCH 1/4] Extend task context logging support for remote logging using WASB With the addition of task context logging feature in PR #32646, this PR extends the feature to Azure Blob Storage (WASB) when is it set as remote logging store. Here, backward compatibility is ensured for older versions of Airflow that do not have the feature included in Airflow Core. --- .../microsoft/azure/log/wasb_task_handler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 97a8af5ae1d67..c5270daae684d 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -31,6 +31,9 @@ from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + def get_default_delete_local_copy(): """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. @@ -92,8 +95,11 @@ def hook(self): ) return None - def set_context(self, ti) -> None: - super().set_context(ti) + def set_context(self, ti: TaskInstance, identifier=None) -> None: + if getattr(self, "supports_task_context_logging", False): + super().set_context(ti, identifier=identifier) + else: + super().set_context(ti) # Local location and remote location is needed to open and # upload local log file to Wasb remote storage. if TYPE_CHECKING: @@ -102,7 +108,7 @@ def set_context(self, ti) -> None: full_path = self.handler.baseFilename self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix() is_trigger_log_context = getattr(ti, "is_trigger_log_context", False) - self.upload_on_close = is_trigger_log_context or not ti.raw + self.upload_on_close = is_trigger_log_context or not getattr(ti, "raw", None) def close(self) -> None: """Close and upload local log file to remote storage Wasb.""" From 24176cd0f79c28802f888528acbc05b8794f9fab Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 17 Nov 2023 17:58:40 +0530 Subject: [PATCH 2/4] Update airflow/providers/microsoft/azure/log/wasb_task_handler.py --- airflow/providers/microsoft/azure/log/wasb_task_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index f7462161165b3..22fa76fcbc0a2 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -32,6 +32,7 @@ if TYPE_CHECKING: import logging + from airflow.models.taskinstance import TaskInstance From 4f822b90cd654769c55228986436e90c602325ef Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 17 Nov 2023 17:59:52 +0530 Subject: [PATCH 3/4] Update airflow/providers/microsoft/azure/log/wasb_task_handler.py --- airflow/providers/microsoft/azure/log/wasb_task_handler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 22fa76fcbc0a2..f7462161165b3 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -32,7 +32,6 @@ if TYPE_CHECKING: import logging - from airflow.models.taskinstance import TaskInstance From 1ca25da1be0a9d73c2128c58f6cb76b86965a61c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 17 Nov 2023 18:00:20 +0530 Subject: [PATCH 4/4] Update airflow/providers/microsoft/azure/log/wasb_task_handler.py --- airflow/providers/microsoft/azure/log/wasb_task_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index f7462161165b3..f3a00e8432b9d 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -32,6 +32,7 @@ if TYPE_CHECKING: import logging + from airflow.models.taskinstance import TaskInstance