From 707f2527dda922d4a28489b018b61ed3e7ee710b Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 4 Feb 2025 15:18:47 +0530 Subject: [PATCH 1/6] add 3.0.0a1 tag in airflow bug report --- .github/ISSUE_TEMPLATE/airflow_bug_report.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml index 5cb424a5a5f29..367d4c0330429 100644 --- a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml @@ -26,8 +26,10 @@ body: multiple: false options: - "2.10.4" + - "3.0.0a1" - "main (development)" - "Other Airflow 2 version (please specify below)" + - validations: required: true - type: input From 91730856f34fb0da93cff162a103f8d17f411c65 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 19:05:09 +0530 Subject: [PATCH 2/6] refactor backfill reprocess behaviour --- airflow/models/backfill.py | 22 ++++++++++++++++++++++ tests/models/test_backfill.py | 29 ++++++++++++----------------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index a2347e76fdc6e..92a377dedbd28 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -263,6 +263,7 @@ def _create_backfill_dag_run( backfill_sort_ordinal, session, ): + clear_dag_run(dag, info, reprocess_behavior, session) with session.begin_nested() as nested: dr = session.scalar( with_row_locks( @@ -420,3 +421,24 @@ def _create_backfill( info, ) return br + + +def clear_dag_run(dag, info, reprocess_behavior, session): + dr = session.scalar( + with_row_locks( + query=_get_latest_dag_run_row_query(info, session), + session=session, + ), + ) + if dr: + if dr.state in {DagRunState.SUCCESS, DagRunState.FAILED} and reprocess_behavior in { + ReprocessBehavior.COMPLETED, + ReprocessBehavior.FAILED, + }: + dag.clear( + run_id=dr.run_id, + dag_run_state=DagRunState.QUEUED, + session=session, + confirm_prompt=False, + dry_run=False, + ) diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 0a1ad5e134921..0ed5291ff5929 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -152,11 +152,6 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): assert all(x.conf == expected_run_conf for x in dag_runs) -# Marking test xfail as backfill reprocess behaviour impacted by restoring logical date unique constraints in #46295 -# TODO: Fix backfill reprocess behaviour as per #46295 -@pytest.mark.xfail( - reason="Backfill reprocess behaviour impacted by restoring logical date unique constraints." -) @pytest.mark.parametrize( "reprocess_behavior, run_counts", [ @@ -164,6 +159,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.NONE, { "2021-01-01": 1, + "2021-01-03": 1, + "2021-01-04": 1, + "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -171,9 +170,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.FAILED, { "2021-01-01": 1, - "2021-01-02": 1, "2021-01-03": 1, + "2021-01-04": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -181,11 +181,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.COMPLETED, { "2021-01-01": 1, - "2021-01-02": 1, "2021-01-03": 1, "2021-01-04": 1, - "2021-01-05": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -210,12 +209,8 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): # whether a dag run is created for backfill depends on # the last run for a logical date ("2021-01-02", ["failed"]), - ("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed" - ("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success" - ("2021-01-05", ["success", "success"]), - ("2021-01-06", ["failed", "failed"]), - ("2021-01-07", ["running", "running"]), - ("2021-01-08", ["failed", "running"]), + ("2021-01-05", ["success"]), + ("2021-01-08", ["running"]), ] for state in states ] @@ -271,12 +266,12 @@ def _get_bdr(date): # 2021-01-04 is "failed" so it may or may not be reprocessed depending # on the configuration - bdr = _get_bdr("2021-01-04") + bdr = _get_bdr("2021-01-05") actual_reason = bdr.exception_reason if reprocess_behavior is ReprocessBehavior.FAILED: - assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS + assert actual_reason == BackfillDagRunExceptionReason.IN_FLIGHT elif reprocess_behavior is ReprocessBehavior.COMPLETED: - assert actual_reason is None + assert actual_reason == BackfillDagRunExceptionReason.IN_FLIGHT elif reprocess_behavior is ReprocessBehavior.NONE: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS From 798ea5c4eb040a023c22ec041d1e3b2db67d0cc0 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 19:09:12 +0530 Subject: [PATCH 3/6] undo changes in airflow_bug_report --- .github/ISSUE_TEMPLATE/airflow_bug_report.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml index 367d4c0330429..df79c1e96d29c 100644 --- a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml @@ -29,7 +29,6 @@ body: - "3.0.0a1" - "main (development)" - "Other Airflow 2 version (please specify below)" - - validations: required: true - type: input From abf19279296dfe1dca3946fa1a83a1cadb4668b9 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 11 Feb 2025 00:13:50 +0530 Subject: [PATCH 4/6] updating backfill id to dag_run table in case we have cleared dag_run for reprocess behaviour --- airflow/models/backfill.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 5b3a975cedfb5..ad864dcb0170d 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -36,6 +36,7 @@ desc, func, select, + update, ) from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import relationship, validates @@ -57,6 +58,7 @@ from airflow.models.dag import DAG from airflow.timetables.base import DagRunInfo + log = logging.getLogger(__name__) @@ -287,7 +289,7 @@ def _create_backfill_dag_run( session, ): # clear dag run if run exits and reprocess behaviour is in completed or failed - clear_run_if_dagrun_exists(dag, info, reprocess_behavior, session) + clear_run_if_dagrun_exists(dag, info, reprocess_behavior, backfill_id, session) with session.begin_nested(): should_skip_create_backfill = should_create_backfill_dag_run( info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session @@ -436,7 +438,9 @@ def _create_backfill( return br -def clear_run_if_dagrun_exists(dag, info, reprocess_behavior, session): +def clear_run_if_dagrun_exists(dag, info, reprocess_behavior, backfill_id, session): + from airflow.models import DagRun + dr = session.scalar( statement=_get_latest_dag_run_row_query(info, session), ) @@ -452,3 +456,11 @@ def clear_run_if_dagrun_exists(dag, info, reprocess_behavior, session): confirm_prompt=False, dry_run=False, ) + # updating backfill id and run_type in dag_run table + update_backfill_id = ( + update(DagRun) + .where(DagRun.logical_date == info.logical_date) + .values(backfill_id=backfill_id, run_type=DagRunType.BACKFILL_JOB) + ) + session.execute(update_backfill_id) + session.commit() From 1e6fba14cbe480dc514b4a5f32b72950768a0eef Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Thu, 13 Feb 2025 01:38:06 +0530 Subject: [PATCH 5/6] refactor reprocess approach --- airflow/models/backfill.py | 139 ++++++++++++++++++++++------------ tests/models/test_backfill.py | 6 +- 2 files changed, 93 insertions(+), 52 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 552c28614867b..78df1a08dd3f8 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -36,7 +36,6 @@ desc, func, select, - update, ) from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import relationship, validates @@ -193,16 +192,32 @@ def _get_latest_dag_run_row_query(info, session): ) -def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None: - non_create_reason = None +def _dag_run_skip_reason_or_clear( + dr, reprocess_behavior: ReprocessBehavior +) -> tuple[BackfillDagRunExceptionReason | None, bool]: + """Determine whether to clear the DAG run or return a reason for skipping.""" + # If DAG run is in progress, it cannot be reprocessed if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - return non_create_reason + return BackfillDagRunExceptionReason.IN_FLIGHT, False + + # If reprocessing is disabled, indicate that the DAG run already exists + if reprocess_behavior is ReprocessBehavior.NONE: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + # Allow clearing if the DAG run state matches the reprocess behavior + if (dr.state == DagRunState.SUCCESS and reprocess_behavior == ReprocessBehavior.COMPLETED) or ( + dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED + ): + return None, True + # If reprocessing only failed runs, but the DAG run was successful, do not clear + if dr.state != DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + # If reprocessing only completed runs, but the DAG run was failed, do not clear + if dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.COMPLETED: + return BackfillDagRunExceptionReason.ALREADY_EXISTS, False + + return None, False # Default case: no reason to skip, no need to clear def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None): @@ -247,7 +262,7 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess statement=_get_latest_dag_run_row_query(info, session), ) if dr: - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + non_create_reason, _ = _dag_run_skip_reason_or_clear(dr, reprocess_behavior) if not non_create_reason: logical_dates.append(info.logical_date) else: @@ -256,24 +271,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess def should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) -> bool: """Determine if a backfill DAG run should be created and add a BackfillDagRun if required.""" - dr = session.scalar(_get_latest_dag_run_row_query(info, session)) + dr = session.scalar( + statement=_get_latest_dag_run_row_query(info, session), + ) if not dr: return False - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) - if non_create_reason: - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, - sort_ordinal=backfill_sort_ordinal, - ) - ) + + non_create_reason, do_clear_run = _dag_run_skip_reason_or_clear(dr, reprocess_behavior) + print(f"dr is {dr.logical_date}") + print(f"state is {dr.state}") + print(f"non_create_reason is {non_create_reason}") + print(f"do_clear_run is {do_clear_run}") + if non_create_reason and not do_clear_run: + _handle_non_create_reason(session, backfill_id, info, non_create_reason, backfill_sort_ordinal) return True + + if do_clear_run: + _handle_clear_run(session, dag, dr, info, backfill_id, backfill_sort_ordinal) + return True + return False @@ -289,11 +308,9 @@ def _create_backfill_dag_run( ): from airflow.models.dagrun import DagRun - # clear dag run if run exits and reprocess behaviour is in completed or failed - clear_run_if_dagrun_exists(dag, info, reprocess_behavior, backfill_id, session) with session.begin_nested(): should_skip_create_backfill = should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) if should_skip_create_backfill: return @@ -335,7 +352,7 @@ def _create_backfill_dag_run( session.rollback() should_create_backfill_dag_run( - info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) @@ -437,29 +454,51 @@ def _create_backfill( return br -def clear_run_if_dagrun_exists(dag, info, reprocess_behavior, backfill_id, session): +def _handle_non_create_reason(session, backfill_id, info, reason, sort_ordinal): + """Record why a backfill DAG run was not created.""" + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=reason, + sort_ordinal=sort_ordinal, + ) + ) + + +def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): + """Clear the existing DAG run and update backfill metadata.""" + from sqlalchemy.sql import update + from airflow.models import DagRun + from airflow.utils.state import DagRunState + from airflow.utils.types import DagRunType + + dag.clear( + run_id=dr.run_id, + dag_run_state=DagRunState.QUEUED, + session=session, + confirm_prompt=False, + dry_run=False, + ) - dr = session.scalar( - statement=_get_latest_dag_run_row_query(info, session), + # Update backfill_id and run_type in DagRun table + session.execute( + update(DagRun) + .where(DagRun.logical_date == info.logical_date) + .values( + backfill_id=backfill_id, + run_type=DagRunType.BACKFILL_JOB, + triggered_by=DagRunTriggeredByType.BACKFILL, + ) ) - if dr: - if dr.state in {DagRunState.SUCCESS, DagRunState.FAILED} and reprocess_behavior in { - ReprocessBehavior.COMPLETED, - ReprocessBehavior.FAILED, - }: - dag.clear( - run_id=dr.run_id, - dag_run_state=DagRunState.QUEUED, - session=session, - confirm_prompt=False, - dry_run=False, - ) - # updating backfill id and run_type in dag_run table - update_backfill_id = ( - update(DagRun) - .where(DagRun.logical_date == info.logical_date) - .values(backfill_id=backfill_id, run_type=DagRunType.BACKFILL_JOB) + + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=info.logical_date, + sort_ordinal=sort_ordinal, ) - session.execute(update_backfill_id) - session.commit() + ) diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 0ed5291ff5929..3170c59047613 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -170,6 +170,7 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.FAILED, { "2021-01-01": 1, + "2021-01-02": 1, "2021-01-03": 1, "2021-01-04": 1, "2021-01-06": 1, @@ -183,6 +184,7 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): "2021-01-01": 1, "2021-01-03": 1, "2021-01-04": 1, + "2021-01-05": 1, "2021-01-06": 1, "2021-01-07": 1, "2021-01-09": 1, @@ -269,9 +271,9 @@ def _get_bdr(date): bdr = _get_bdr("2021-01-05") actual_reason = bdr.exception_reason if reprocess_behavior is ReprocessBehavior.FAILED: - assert actual_reason == BackfillDagRunExceptionReason.IN_FLIGHT + assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS elif reprocess_behavior is ReprocessBehavior.COMPLETED: - assert actual_reason == BackfillDagRunExceptionReason.IN_FLIGHT + assert actual_reason is None elif reprocess_behavior is ReprocessBehavior.NONE: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS From 5e122760ca5227cbd1e34f16f8037b6788d54efb Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Thu, 13 Feb 2025 11:59:36 +0530 Subject: [PATCH 6/6] fix dry run test and with completed behaviour now failed in status dagrun would not be executed --- tests/api_fastapi/core_api/routes/public/test_backfills.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 97812dfd91a3a..286d3cc5b6d7b 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -338,7 +338,6 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint): [ {"logical_date": "2024-01-01T00:00:00Z"}, {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all - {"logical_date": "2024-01-03T00:00:00Z"}, {"logical_date": "2024-01-04T00:00:00Z"}, {"logical_date": "2024-01-05T00:00:00Z"}, ],