diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 71005e37a756b..8798c542c1cd2 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -19,7 +19,6 @@ import logging import os from concurrent.futures import ProcessPoolExecutor -from datetime import datetime from typing import TYPE_CHECKING import psutil @@ -43,6 +42,7 @@ ) from airflow.settings import configure_orm from airflow.stats import Stats +from airflow.utils import timezone from airflow.utils.timeout import timeout if TYPE_CHECKING: @@ -145,7 +145,7 @@ def on_running(): with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): task_metadata = self.extractor_manager.extract_metadata(dagrun, task) - start_date = task_instance.start_date if task_instance.start_date else datetime.now() + start_date = task_instance.start_date if task_instance.start_date else timezone.utcnow() data_interval_start = ( dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None ) @@ -224,7 +224,7 @@ def on_success(): dagrun, task, complete=True, task_instance=task_instance ) - end_date = task_instance.end_date if task_instance.end_date else datetime.now() + end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow() redacted_event = self.adapter.complete_task( run_id=task_uuid, @@ -318,7 +318,7 @@ def on_failure(): dagrun, task, complete=True, task_instance=task_instance ) - end_date = task_instance.end_date if task_instance.end_date else datetime.now() + end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow() redacted_event = self.adapter.fail_task( run_id=task_uuid,