diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 31ace35777d45..efec15e536feb 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -200,16 +200,32 @@ def _get_latest_dag_run_row_query(info, session): ) -def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None: - non_create_reason = None +def _dag_run_skip_reason_or_clear( + dr, reprocess_behavior: ReprocessBehavior +) -> tuple[BackfillDagRunExceptionReason | None, bool]: + """Determine whether to clear the DAG run or return a reason for skipping.""" + # If DAG run is in progress, it cannot be reprocessed if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - return non_create_reason + return BackfillDagRunExceptionReason.IN_FLIGHT, False + + # If reprocessing is disabled, indicate that the DAG run already exists + if reprocess_behavior is ReprocessBehavior.NONE: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + # Allow clearing if the DAG run state matches the reprocess behavior + if (dr.state == DagRunState.SUCCESS and reprocess_behavior == ReprocessBehavior.COMPLETED) or ( + dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED + ): + return None, True + # If reprocessing only failed runs, but the DAG run was successful, do not clear + if dr.state != DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + # If reprocessing only completed runs, but the DAG run was failed, do not clear + if dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.COMPLETED: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + return None, False # Default case: no reason to skip, no need to clear def _validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior: ReprocessBehavior | None): @@ -257,7 +273,7 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess statement=_get_latest_dag_run_row_query(info, session), ) if dr: - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + non_create_reason, _ = _dag_run_skip_reason_or_clear(dr, reprocess_behavior) if not non_create_reason: logical_dates.append(info.logical_date) else: @@ -266,24 +282,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess def should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) -> bool: """Determine if a backfill DAG run should be created and add a BackfillDagRun if required.""" - dr = session.scalar(_get_latest_dag_run_row_query(info, session)) + dr = session.scalar( + statement=_get_latest_dag_run_row_query(info, session), + ) if not dr: return False - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) - if non_create_reason: - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, - sort_ordinal=backfill_sort_ordinal, - ) - ) + + non_create_reason, do_clear_run = _dag_run_skip_reason_or_clear(dr, reprocess_behavior) + print(f"dr is {dr.logical_date}") + print(f"state is {dr.state}") + print(f"non_create_reason is {non_create_reason}") + print(f"do_clear_run is {do_clear_run}") + if non_create_reason and not do_clear_run: + _handle_non_create_reason(session, backfill_id, info, non_create_reason, backfill_sort_ordinal) + return True + + if do_clear_run: + _handle_clear_run(session, dag, dr, info, backfill_id, backfill_sort_ordinal) return True + return False @@ -301,7 +321,7 @@ def _create_backfill_dag_run( with session.begin_nested(): should_skip_create_backfill = should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) if should_skip_create_backfill: return @@ -343,7 +363,7 @@ def _create_backfill_dag_run( session.rollback() should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) @@ -443,3 +463,53 @@ def _create_backfill( info, ) return br + + +def _handle_non_create_reason(session, backfill_id, info, reason, sort_ordinal): + """Record why a backfill DAG run was not created.""" + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=reason, + sort_ordinal=sort_ordinal, + ) + ) + + +def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): + """Clear the existing DAG run and update backfill metadata.""" + from sqlalchemy.sql import update + + from airflow.models import DagRun + from airflow.utils.state import DagRunState + from airflow.utils.types import DagRunType + + dag.clear( + run_id=dr.run_id, + dag_run_state=DagRunState.QUEUED, + session=session, + confirm_prompt=False, + dry_run=False, + ) + + # Update backfill_id and run_type in DagRun table + session.execute( + update(DagRun) + .where(DagRun.logical_date == info.logical_date) + .values( + backfill_id=backfill_id, + run_type=DagRunType.BACKFILL_JOB, + triggered_by=DagRunTriggeredByType.BACKFILL, + ) + ) + + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=info.logical_date, + sort_ordinal=sort_ordinal, + ) + ) diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 04f61d736b152..b8464dc4a0e97 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -404,7 +404,6 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint): [ {"logical_date": "2024-01-01T00:00:00Z"}, {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all - {"logical_date": "2024-01-03T00:00:00Z"}, {"logical_date": "2024-01-04T00:00:00Z"}, {"logical_date": "2024-01-05T00:00:00Z"}, ], diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 0a1ad5e134921..3170c59047613 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -152,11 +152,6 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): assert all(x.conf == expected_run_conf for x in dag_runs) -# Marking test xfail as backfill reprocess behaviour impacted by restoring logical date unique constraints in #46295 -# TODO: Fix backfill reprocess behaviour as per #46295 -@pytest.mark.xfail( - reason="Backfill reprocess behaviour impacted by restoring logical date unique constraints." -) @pytest.mark.parametrize( "reprocess_behavior, run_counts", [ @@ -164,6 +159,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.NONE, { "2021-01-01": 1, + "2021-01-03": 1, + "2021-01-04": 1, + "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -173,7 +172,9 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): "2021-01-01": 1, "2021-01-02": 1, "2021-01-03": 1, + "2021-01-04": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -181,11 +182,11 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.COMPLETED, { "2021-01-01": 1, - "2021-01-02": 1, "2021-01-03": 1, "2021-01-04": 1, "2021-01-05": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -210,12 +211,8 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): # whether a dag run is created for backfill depends on # the last run for a logical date ("2021-01-02", ["failed"]), - ("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed" - ("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success" - ("2021-01-05", ["success", "success"]), - ("2021-01-06", ["failed", "failed"]), - ("2021-01-07", ["running", "running"]), - ("2021-01-08", ["failed", "running"]), + ("2021-01-05", ["success"]), + ("2021-01-08", ["running"]), ] for state in states ] @@ -271,7 +268,7 @@ def _get_bdr(date): # 2021-01-04 is "failed" so it may or may not be reprocessed depending # on the configuration - bdr = _get_bdr("2021-01-04") + bdr = _get_bdr("2021-01-05") actual_reason = bdr.exception_reason if reprocess_behavior is ReprocessBehavior.FAILED: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS