From ea88aaaf3e67f34da7365d5784c1d91c02310fbc Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Fri, 19 Jul 2024 13:41:05 +0200 Subject: [PATCH] openlineage: replace dt.now with airflow.utils.timezone.utcnow Signed-off-by: Kacper Muda --- airflow/providers/openlineage/plugins/listener.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 71005e37a756b..8798c542c1cd2 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -19,7 +19,6 @@ import logging import os from concurrent.futures import ProcessPoolExecutor -from datetime import datetime from typing import TYPE_CHECKING import psutil @@ -43,6 +42,7 @@ ) from airflow.settings import configure_orm from airflow.stats import Stats +from airflow.utils import timezone from airflow.utils.timeout import timeout if TYPE_CHECKING: @@ -145,7 +145,7 @@ def on_running(): with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): task_metadata = self.extractor_manager.extract_metadata(dagrun, task) - start_date = task_instance.start_date if task_instance.start_date else datetime.now() + start_date = task_instance.start_date if task_instance.start_date else timezone.utcnow() data_interval_start = ( dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None ) @@ -224,7 +224,7 @@ def on_success(): dagrun, task, complete=True, task_instance=task_instance ) - end_date = task_instance.end_date if task_instance.end_date else datetime.now() + end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow() redacted_event = self.adapter.complete_task( run_id=task_uuid, @@ -318,7 +318,7 @@ def on_failure(): dagrun, task, complete=True, task_instance=task_instance ) - end_date = task_instance.end_date if task_instance.end_date else datetime.now() + end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow() redacted_event = self.adapter.fail_task( run_id=task_uuid,