From fa032107f8ca6707f0f484bfbd10924cb374fd0a Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Sat, 11 Jan 2025 18:51:21 +0100 Subject: [PATCH 1/2] [v2-10-test] Ensure teardown tasks are executed when DAG run is set to failed (#45530) * Ensure teardown tasks are executed when DAG run is set to failed * Also handle the case of setting DAG to success * Add some documentation to behavior changes * Add some documentation to behavior changes (cherry picked from commit 1e8977a2ea24e989c6c57ee3cb8e7b6bc4cf6c56) Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> --- airflow/api/common/mark_tasks.py | 41 +++++----- .../howto/setup-and-teardown.rst | 8 +- newsfragments/45530.significant.rst | 12 +++ tests/api/common/test_mark_tasks.py | 76 +++++++++++++++++++ 4 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 newsfragments/45530.significant.rst create mode 100644 tests/api/common/test_mark_tasks.py diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index 58ca737a5715b..6b656e85a6940 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -411,15 +411,18 @@ def set_dag_run_state_to_success( run_id = dag_run.run_id if not run_id: raise ValueError(f"Invalid dag_run_id: {run_id}") + + # Mark all task instances of the dag run to success - except for teardown as they need to complete work. + normal_tasks = [task for task in dag.tasks if not task.is_teardown] + # Mark the dag run to success. - if commit: + if commit and len(normal_tasks) == len(dag.tasks): _set_dag_run_state(dag.dag_id, run_id, DagRunState.SUCCESS, session) - # Mark all task instances of the dag run to success. - for task in dag.tasks: + for task in normal_tasks: task.dag = dag return set_state( - tasks=dag.tasks, + tasks=normal_tasks, run_id=run_id, state=TaskInstanceState.SUCCESS, commit=commit, @@ -466,10 +469,6 @@ def set_dag_run_state_to_failed( if not run_id: raise ValueError(f"Invalid dag_run_id: {run_id}") - # Mark the dag run to failed. - if commit: - _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session) - running_states = ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, @@ -478,25 +477,26 @@ def set_dag_run_state_to_failed( # Mark only RUNNING task instances. task_ids = [task.task_id for task in dag.tasks] - tis = session.scalars( + running_tis: list[TaskInstance] = session.scalars( select(TaskInstance).where( TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id == run_id, TaskInstance.task_id.in_(task_ids), TaskInstance.state.in_(running_states), ) - ) + ).all() - task_ids_of_running_tis = [task_instance.task_id for task_instance in tis] + # Do not kill teardown tasks + task_ids_of_running_tis = [ti.task_id for ti in running_tis if not dag.task_dict[ti.task_id].is_teardown] - tasks = [] + running_tasks = [] for task in dag.tasks: if task.task_id in task_ids_of_running_tis: task.dag = dag - tasks.append(task) + running_tasks.append(task) # Mark non-finished tasks as SKIPPED. - tis = session.scalars( + pending_tis: list[TaskInstance] = session.scalars( select(TaskInstance).filter( TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id == run_id, @@ -510,12 +510,19 @@ def set_dag_run_state_to_failed( ) ).all() + # Do not skip teardown tasks + pending_normal_tis = [ti for ti in pending_tis if not dag.task_dict[ti.task_id].is_teardown] + if commit: - for ti in tis: + for ti in pending_normal_tis: ti.set_state(TaskInstanceState.SKIPPED) - return tis + set_state( - tasks=tasks, + # Mark the dag run to failed if there is no pending teardown (else this would not be scheduled later). + if not any(dag.task_dict[ti.task_id].is_teardown for ti in (running_tis + pending_tis)): + _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session) + + return pending_normal_tis + set_state( + tasks=running_tasks, run_id=run_id, state=TaskInstanceState.FAILED, commit=commit, diff --git a/docs/apache-airflow/howto/setup-and-teardown.rst b/docs/apache-airflow/howto/setup-and-teardown.rst index 7afb3c4a350b3..c802c8bedaf9d 100644 --- a/docs/apache-airflow/howto/setup-and-teardown.rst +++ b/docs/apache-airflow/howto/setup-and-teardown.rst @@ -24,8 +24,9 @@ Key features of setup and teardown tasks: * If you clear a task, its setups and teardowns will be cleared. * By default, teardown tasks are ignored for the purpose of evaluating dag run state. - * A teardown task will run if its setup was successful, even if its work tasks failed. + * A teardown task will run if its setup was successful, even if its work tasks failed. But it will skip if the setup was skipped. * Teardown tasks are ignored when setting dependencies against task groups. + * Teardown will also be carried out if the DAG run is manually set to "failed" or "success" to ensure resources will be cleaned-up. How setup and teardown works """""""""""""""""""""""""""" @@ -231,3 +232,8 @@ Trigger rule behavior for teardowns """"""""""""""""""""""""""""""""""" Teardowns use a (non-configurable) trigger rule called ALL_DONE_SETUP_SUCCESS. With this rule, as long as all upstreams are done and at least one directly connected setup is successful, the teardown will run. If all of a teardown's setups were skipped or failed, those states will propagate to the teardown. + +Side-effect on manual DAG state changes +""""""""""""""""""""""""""""""""""""""" + +As teardown tasks are often used to clean-up resources they need to run also if the DAG is manually terminated. For the purpose of early termination a user can manually mark the DAG run as "success" or "failed" which kills all tasks before completion. If the DAG contains teardown tasks, they will still be executed. Therefore as a side effect allowing teardown tasks to be scheduled, a DAG will not be immediately set to a terminal state if the user requests so. diff --git a/newsfragments/45530.significant.rst b/newsfragments/45530.significant.rst new file mode 100644 index 0000000000000..7e2ae8e8ac6a5 --- /dev/null +++ b/newsfragments/45530.significant.rst @@ -0,0 +1,12 @@ +Ensure teardown tasks are executed when DAG run is set to failed + +Previously when a DAG run was manually set to "failed" or to "success" state the terminal state was set to all tasks. +But this was a gap for cases when setup- and teardown tasks were defined: If teardown was used to clean-up infrastructure +or other resources, they were also skipped and thus resources could stay allocated. + +As of now when setup tasks had been executed before and the DAG is manually set to "failed" or "success" then teardown +tasks are executed. Teardown tasks are skipped if the setup was also skipped. + +As a side effect this means if the DAG contains teardown tasks, then the manual marking of DAG as "failed" or "success" +will need to keep the DAG in running state to ensure that teardown tasks will be scheduled. They would not be scheduled +if the DAG is diorectly set to "failed" or "success". diff --git a/tests/api/common/test_mark_tasks.py b/tests/api/common/test_mark_tasks.py new file mode 100644 index 0000000000000..124232842d9a0 --- /dev/null +++ b/tests/api/common/test_mark_tasks.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from airflow.api.common.mark_tasks import set_dag_run_state_to_failed, set_dag_run_state_to_success +from airflow.operators.empty import EmptyOperator +from airflow.utils.state import TaskInstanceState + +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + + from tests_common.pytest_plugin import DagMaker + +pytestmark = pytest.mark.db_test + + +def test_set_dag_run_state_to_failed(dag_maker: DagMaker): + with dag_maker("TEST_DAG_1"): + with EmptyOperator(task_id="teardown").as_teardown(): + EmptyOperator(task_id="running") + EmptyOperator(task_id="pending") + dr = dag_maker.create_dagrun() + for ti in dr.get_task_instances(): + if ti.task_id == "running": + ti.set_state(TaskInstanceState.RUNNING) + dag_maker.session.flush() + assert dr.dag + + updated_tis: list[TaskInstance] = set_dag_run_state_to_failed( + dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session + ) + assert len(updated_tis) == 2 + task_dict = {ti.task_id: ti for ti in updated_tis} + assert task_dict["running"].state == TaskInstanceState.FAILED + assert task_dict["pending"].state == TaskInstanceState.SKIPPED + assert "teardown" not in task_dict + + +def test_set_dag_run_state_to_success(dag_maker: DagMaker): + with dag_maker("TEST_DAG_1"): + with EmptyOperator(task_id="teardown").as_teardown(): + EmptyOperator(task_id="running") + EmptyOperator(task_id="pending") + dr = dag_maker.create_dagrun() + for ti in dr.get_task_instances(): + if ti.task_id == "running": + ti.set_state(TaskInstanceState.RUNNING) + dag_maker.session.flush() + assert dr.dag + + updated_tis: list[TaskInstance] = set_dag_run_state_to_success( + dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session + ) + assert len(updated_tis) == 2 + task_dict = {ti.task_id: ti for ti in updated_tis} + assert task_dict["running"].state == TaskInstanceState.SUCCESS + assert task_dict["pending"].state == TaskInstanceState.SUCCESS + assert "teardown" not in task_dict From aafcc372af84f8776a85bfa8f1eeede2ca68e32e Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 11 Jan 2025 19:26:48 +0100 Subject: [PATCH 2/2] Remove type hints only working in Airflow 3 --- tests/api/common/test_mark_tasks.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/api/common/test_mark_tasks.py b/tests/api/common/test_mark_tasks.py index 124232842d9a0..0cf58ee74a67a 100644 --- a/tests/api/common/test_mark_tasks.py +++ b/tests/api/common/test_mark_tasks.py @@ -27,12 +27,10 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance - from tests_common.pytest_plugin import DagMaker - pytestmark = pytest.mark.db_test -def test_set_dag_run_state_to_failed(dag_maker: DagMaker): +def test_set_dag_run_state_to_failed(dag_maker): with dag_maker("TEST_DAG_1"): with EmptyOperator(task_id="teardown").as_teardown(): EmptyOperator(task_id="running") @@ -54,7 +52,7 @@ def test_set_dag_run_state_to_failed(dag_maker: DagMaker): assert "teardown" not in task_dict -def test_set_dag_run_state_to_success(dag_maker: DagMaker): +def test_set_dag_run_state_to_success(dag_maker): with dag_maker("TEST_DAG_1"): with EmptyOperator(task_id="teardown").as_teardown(): EmptyOperator(task_id="running")