diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 1496b31fac682..bf620e9be2556 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -340,7 +340,8 @@ def _read( if metadata and "log_pos" in metadata: previous_chars = metadata["log_pos"] logs = logs[previous_chars:] # Cut off previously passed log test as new tail - return messages + logs, {"end_of_log": end_of_log, "log_pos": log_pos} + out_message = logs if "log_pos" in (metadata or {}) else messages + logs + return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} @staticmethod def _get_pod_namespace(ti: TaskInstance): diff --git a/airflow/www/views.py b/airflow/www/views.py index 2ee931372701e..543708b909f92 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1527,7 +1527,15 @@ def get_logs_with_metadata(self, session=None): ti = ( session.query(models.TaskInstance) - .filter_by(dag_id=dag_id, task_id=task_id, execution_date=execution_date, map_index=map_index) + .filter( + TaskInstance.task_id == task_id, + TaskInstance.dag_id == dag_id, + TaskInstance.execution_date == execution_date, + TaskInstance.map_index == map_index, + ) + .join(TaskInstance.dag_run) + .options(joinedload("trigger")) + .options(joinedload("trigger.triggerer_job")) .first() )