From 9e25b81f4fbb8e077fcd86aff974461fe9f6619f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sun, 30 Jul 2023 21:07:21 +0530 Subject: [PATCH 1/2] Add task context logging support for remote AWS S3 logging With the addition of taxt context logging feature in PR #32646, this PR extends the feature to AWS S3 when is it set as remote logging store. Here, backward compatibility is ensured for older versions of Airflow that do not have the feature included in Airflow Core. --- airflow/providers/amazon/aws/log/s3_task_handler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index d949a8b7b25f4..4996732d4502f 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -71,14 +71,17 @@ def hook(self): aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), transfer_config_args={"use_threads": False} ) - def set_context(self, ti): - super().set_context(ti) + def set_context(self, ti, identifier=None): + if getattr(self, "supports_task_context_logging", False): + super().set_context(ti, identifier=identifier) + else: + super().set_context(ti) # Local location and remote location is needed to open and # upload local log file to S3 remote storage. full_path = self.handler.baseFilename self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix() is_trigger_log_context = getattr(ti, "is_trigger_log_context", False) - self.upload_on_close = is_trigger_log_context or not ti.raw + self.upload_on_close = is_trigger_log_context or not getattr(ti, "raw", None) # Clear the file first so that duplicate data is not uploaded # when re-using the same path (e.g. with rescheduled sensors) if self.upload_on_close: From cb83eb1e41806dda0a09a8955aa05f166742ba0d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 31 Jul 2023 22:52:21 +0530 Subject: [PATCH 2/2] Add type annotations --- airflow/providers/amazon/aws/log/s3_task_handler.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 4996732d4502f..761c4ce463814 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -17,10 +17,12 @@ # under the License. from __future__ import annotations +import logging import os import pathlib import shutil from functools import cached_property +from typing import TYPE_CHECKING from packaging.version import Version @@ -29,6 +31,9 @@ from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + def get_default_delete_local_copy(): """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. @@ -55,6 +60,7 @@ def __init__( self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None, **kwargs ): super().__init__(base_log_folder, filename_template) + self.handler: logging.FileHandler | None = None self.remote_base = s3_log_folder self.log_relative_path = "" self._hook = None @@ -71,13 +77,16 @@ def hook(self): aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), transfer_config_args={"use_threads": False} ) - def set_context(self, ti, identifier=None): + def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: if getattr(self, "supports_task_context_logging", False): super().set_context(ti, identifier=identifier) else: super().set_context(ti) # Local location and remote location is needed to open and # upload local log file to S3 remote storage. + if TYPE_CHECKING: + assert self.handler is not None + full_path = self.handler.baseFilename self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix() is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)