Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
707f252
add 3.0.0a1 tag in airflow bug report
vatsrahul1001 Feb 4, 2025
bdd4ef0
Merge branch 'main' of github.com:astronomer/airflow
vatsrahul1001 Feb 5, 2025
33b49bc
Merge branch 'main' of github.com:astronomer/airflow
vatsrahul1001 Feb 7, 2025
9173085
refactor backfill reprocess behaviour
vatsrahul1001 Feb 7, 2025
798ea5c
undo changes in airflow_bug_report
vatsrahul1001 Feb 7, 2025
b75b194
merge from main
vatsrahul1001 Feb 7, 2025
fe89c16
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 7, 2025
b905a83
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 7, 2025
9cb7f1c
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 10, 2025
abf1927
updating backfill id to dag_run table in case we have cleared dag_run…
vatsrahul1001 Feb 10, 2025
269d4fa
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 10, 2025
430a9c1
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 10, 2025
e0a6ed3
Merge branch 'main' of github.com:astronomer/airflow into AIP83-refac…
vatsrahul1001 Feb 11, 2025
975cea9
Merge branch 'AIP83-refactor-backfill-reprocess-behaviour' of github.…
vatsrahul1001 Feb 11, 2025
1e6fba1
refactor reprocess approach
vatsrahul1001 Feb 12, 2025
6b8bc7e
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 12, 2025
1d27261
Merge branch 'main' into AIP83-refactor-backfill-reprocess-behaviour
vatsrahul1001 Feb 13, 2025
5e12276
fix dry run test and with completed behaviour now failed in status da…
vatsrahul1001 Feb 13, 2025
b91d8e0
Merge branch 'AIP83-refactor-backfill-reprocess-behaviour' of github.…
vatsrahul1001 Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 95 additions & 25 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,32 @@ def _get_latest_dag_run_row_query(info, session):
)


def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None:
non_create_reason = None
def _dag_run_skip_reason_or_clear(
dr, reprocess_behavior: ReprocessBehavior
) -> tuple[BackfillDagRunExceptionReason | None, bool]:
"""Determine whether to clear the DAG run or return a reason for skipping."""
# If DAG run is in progress, it cannot be reprocessed
if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
elif reprocess_behavior is ReprocessBehavior.NONE:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
elif reprocess_behavior is ReprocessBehavior.FAILED:
if dr.state != DagRunState.FAILED:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
return non_create_reason
return BackfillDagRunExceptionReason.IN_FLIGHT, False

# If reprocessing is disabled, indicate that the DAG run already exists
if reprocess_behavior is ReprocessBehavior.NONE:
return BackfillDagRunExceptionReason.ALREADY_EXISTS, False

# Allow clearing if the DAG run state matches the reprocess behavior
if (dr.state == DagRunState.SUCCESS and reprocess_behavior == ReprocessBehavior.COMPLETED) or (
dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED
):
return None, True
# If reprocessing only failed runs, but the DAG run was successful, do not clear
if dr.state != DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.FAILED:
return BackfillDagRunExceptionReason.ALREADY_EXISTS, False

# If reprocessing only completed runs, but the DAG run was failed, do not clear
if dr.state == DagRunState.FAILED and reprocess_behavior == ReprocessBehavior.COMPLETED:
return BackfillDagRunExceptionReason.ALREADY_EXISTS, False

return None, False # Default case: no reason to skip, no need to clear


def _validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior: ReprocessBehavior | None):
Expand Down Expand Up @@ -257,7 +273,7 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess
statement=_get_latest_dag_run_row_query(info, session),
)
if dr:
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
non_create_reason, _ = _dag_run_skip_reason_or_clear(dr, reprocess_behavior)
if not non_create_reason:
logical_dates.append(info.logical_date)
else:
Expand All @@ -266,24 +282,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess


def should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
) -> bool:
"""Determine if a backfill DAG run should be created and add a BackfillDagRun if required."""
dr = session.scalar(_get_latest_dag_run_row_query(info, session))
dr = session.scalar(
statement=_get_latest_dag_run_row_query(info, session),
)
if not dr:
return False
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if non_create_reason:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=non_create_reason,
sort_ordinal=backfill_sort_ordinal,
)
)

non_create_reason, do_clear_run = _dag_run_skip_reason_or_clear(dr, reprocess_behavior)
print(f"dr is {dr.logical_date}")
print(f"state is {dr.state}")
print(f"non_create_reason is {non_create_reason}")
print(f"do_clear_run is {do_clear_run}")
if non_create_reason and not do_clear_run:
_handle_non_create_reason(session, backfill_id, info, non_create_reason, backfill_sort_ordinal)
return True

if do_clear_run:
_handle_clear_run(session, dag, dr, info, backfill_id, backfill_sort_ordinal)
return True

return False


Expand All @@ -301,7 +321,7 @@ def _create_backfill_dag_run(

with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)
if should_skip_create_backfill:
return
Expand Down Expand Up @@ -343,7 +363,7 @@ def _create_backfill_dag_run(
session.rollback()

should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
dag, info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)


Expand Down Expand Up @@ -443,3 +463,53 @@ def _create_backfill(
info,
)
return br


def _handle_non_create_reason(session, backfill_id, info, reason, sort_ordinal):
"""Record why a backfill DAG run was not created."""
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=reason,
sort_ordinal=sort_ordinal,
)
)


def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal):
"""Clear the existing DAG run and update backfill metadata."""
from sqlalchemy.sql import update

from airflow.models import DagRun
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

dag.clear(
run_id=dr.run_id,
dag_run_state=DagRunState.QUEUED,
session=session,
confirm_prompt=False,
dry_run=False,
)

# Update backfill_id and run_type in DagRun table
session.execute(
update(DagRun)
.where(DagRun.logical_date == info.logical_date)
.values(
backfill_id=backfill_id,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
)
)

session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=info.logical_date,
sort_ordinal=sort_ordinal,
)
)
1 change: 0 additions & 1 deletion tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint):
[
{"logical_date": "2024-01-01T00:00:00Z"},
{"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all
{"logical_date": "2024-01-03T00:00:00Z"},
{"logical_date": "2024-01-04T00:00:00Z"},
{"logical_date": "2024-01-05T00:00:00Z"},
],
Expand Down
23 changes: 10 additions & 13 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,17 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
assert all(x.conf == expected_run_conf for x in dag_runs)


# Marking test xfail as backfill reprocess behaviour impacted by restoring logical date unique constraints in #46295
# TODO: Fix backfill reprocess behaviour as per #46295
@pytest.mark.xfail(
reason="Backfill reprocess behaviour impacted by restoring logical date unique constraints."
)
@pytest.mark.parametrize(
"reprocess_behavior, run_counts",
[
(
ReprocessBehavior.NONE,
{
"2021-01-01": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
Expand All @@ -173,19 +172,21 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
(
ReprocessBehavior.COMPLETED,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-05": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
Expand All @@ -210,12 +211,8 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session):
# whether a dag run is created for backfill depends on
# the last run for a logical date
("2021-01-02", ["failed"]),
("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed"
("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success"
("2021-01-05", ["success", "success"]),
("2021-01-06", ["failed", "failed"]),
("2021-01-07", ["running", "running"]),
("2021-01-08", ["failed", "running"]),
("2021-01-05", ["success"]),
("2021-01-08", ["running"]),
]
for state in states
]
Expand Down Expand Up @@ -271,7 +268,7 @@ def _get_bdr(date):

# 2021-01-04 is "failed" so it may or may not be reprocessed depending
# on the configuration
bdr = _get_bdr("2021-01-04")
bdr = _get_bdr("2021-01-05")
actual_reason = bdr.exception_reason
if reprocess_behavior is ReprocessBehavior.FAILED:
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS
Expand Down
Loading