From f52a769df44808db162b17d8bf90be5340c59231 Mon Sep 17 00:00:00 2001 From: oboki Date: Tue, 20 May 2025 21:32:41 +0900 Subject: [PATCH] fix: resolve 404 log error for non-latest task tries in multi-host worker environments (#50175) * fix: resolve 404 log error for non-latest task tries in multi-host worker environments * refactor: extract TaskInstance and TaskInstanceHistory query logic from `get_log` endpoint function * test: add unit test for `get_task_instance_or_history_for_try_number` function * fix: resolve sqlite lock error #50763 Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com> --------- Co-authored-by: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com> (cherry picked from commit 0b0ff5d11802a57b136f159cb6753a16dbe5fe07) --- .../api_fastapi/common/db/task_instance.py | 57 +++++++++++++ .../api_fastapi/core_api/routes/public/log.py | 32 ++------ .../airflow/utils/log/file_task_handler.py | 4 +- .../unit/api_fastapi/common/db/__init__.py | 16 ++++ .../common/db/test_task_instance.py | 79 +++++++++++++++++++ 5 files changed, 163 insertions(+), 25 deletions(-) create mode 100644 airflow-core/src/airflow/api_fastapi/common/db/task_instance.py create mode 100644 airflow-core/tests/unit/api_fastapi/common/db/__init__.py create mode 100644 airflow-core/tests/unit/api_fastapi/common/db/test_task_instance.py diff --git a/airflow-core/src/airflow/api_fastapi/common/db/task_instance.py b/airflow-core/src/airflow/api_fastapi/common/db/task_instance.py new file mode 100644 index 0000000000000..b6748bdd61a4c --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/common/db/task_instance.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import PositiveInt +from sqlalchemy.orm import joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory + + +def get_task_instance_or_history_for_try_number( + dag_id: str, + dag_run_id: str, + task_id: str, + try_number: PositiveInt, + session: SessionDep, + map_index: int, +) -> TaskInstance | TaskInstanceHistory: + query = ( + select(TaskInstance) + .where( + TaskInstance.task_id == task_id, + TaskInstance.dag_id == dag_id, + TaskInstance.run_id == dag_run_id, + TaskInstance.map_index == map_index, + ) + .join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) + ) + ti = session.scalar(query) + if ti is None or ti.try_number != try_number: + query = select(TaskInstanceHistory).where( + TaskInstanceHistory.task_id == task_id, + TaskInstanceHistory.dag_id == dag_id, + TaskInstanceHistory.run_id == dag_run_id, + TaskInstanceHistory.map_index == map_index, + TaskInstanceHistory.try_number == try_number, + ) + ti = session.scalar(query) + return ti diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 3873cadf69d76..da459dd95cf31 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -23,11 +23,10 @@ from fastapi import Depends, HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer from pydantic import PositiveInt -from sqlalchemy.orm import joinedload -from sqlalchemy.sql import select from airflow.api_fastapi.common.dagbag import DagBagDep from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.common.db.task_instance import get_task_instance_or_history_for_try_number from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.common.types import Mimetype @@ -35,8 +34,6 @@ from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import DagAccessEntity, requires_access_dag from airflow.exceptions import TaskNotFound -from airflow.models import TaskInstance, Trigger -from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.log.log_reader import TaskLogReader task_instances_log_router = AirflowRouter( @@ -104,27 +101,14 @@ def get_log( if not task_log_reader.supports_read: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") - query = ( - select(TaskInstance) - .where( - TaskInstance.task_id == task_id, - TaskInstance.dag_id == dag_id, - TaskInstance.run_id == dag_run_id, - TaskInstance.map_index == map_index, - ) - .join(TaskInstance.dag_run) - .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) + ti = get_task_instance_or_history_for_try_number( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + try_number=try_number, + session=session, + map_index=map_index, ) - ti = session.scalar(query) - if ti is None: - query = select(TaskInstanceHistory).where( - TaskInstanceHistory.task_id == task_id, - TaskInstanceHistory.dag_id == dag_id, - TaskInstanceHistory.run_id == dag_run_id, - TaskInstanceHistory.map_index == map_index, - TaskInstanceHistory.try_number == try_number, - ) - ti = session.scalar(query) if ti is None: metadata["end_of_log"] = True diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 5314aa859c13f..56f1387a1dc95 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -586,7 +586,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[LogSourceInfo sources = [] logs = [] try: - log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER + log_type = ( + LogType.TRIGGER if hasattr(ti, "triggerer_job") and ti.triggerer_job else LogType.WORKER + ) url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) response = _fetch_logs_from_service(url, rel_path) if response.status_code == 403: diff --git a/airflow-core/tests/unit/api_fastapi/common/db/__init__.py b/airflow-core/tests/unit/api_fastapi/common/db/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/db/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/api_fastapi/common/db/test_task_instance.py b/airflow-core/tests/unit/api_fastapi/common/db/test_task_instance.py new file mode 100644 index 0000000000000..e5ed829dc8f4b --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/db/test_task_instance.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow.api_fastapi.common.db.task_instance import get_task_instance_or_history_for_try_number +from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.utils import timezone +from airflow.utils.types import DagRunType + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + + +class TestDBTaskInstance: + DAG_ID = "dag_for_testing_db_task_instance" + RUN_ID = "dag_run_id_for_testing_db_task_instance" + TASK_ID = "task_for_testing_db_task_instance" + TRY_NUMBER = 1 + + default_time = "2020-06-10T20:00:00+00:00" + + @pytest.fixture(autouse=True) + def setup_attrs(self, dag_maker, session) -> None: + with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag: + EmptyOperator(task_id=self.TASK_ID) + + dr = dag_maker.create_dagrun( + run_id=self.RUN_ID, + run_type=DagRunType.SCHEDULED, + logical_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + ) + + for ti in dr.task_instances: + ti.try_number = 1 + ti.hostname = "localhost" + session.merge(ti) + dag.clear() + for ti in dr.task_instances: + ti.try_number = 2 + ti.hostname = "localhost" + session.merge(ti) + session.commit() + + def teardown_method(self): + clear_db_runs() + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_get_task_instance_or_history_for_try_number(self, try_number, session): + ti = get_task_instance_or_history_for_try_number( + self.DAG_ID, + self.RUN_ID, + self.TASK_ID, + try_number, + session=session, + map_index=-1, + ) + + assert isinstance(ti, TaskInstanceHistory) if try_number == 1 else TaskInstance