Skip to content
124 changes: 68 additions & 56 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
func,
select,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship, validates
from sqlalchemy_jsonfield import JSONField

Expand All @@ -46,7 +47,7 @@
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -253,6 +254,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess
return logical_dates


def should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
) -> bool:
"""Determine if a backfill DAG run should be created and add a BackfillDagRun if required."""
dr = session.scalar(_get_latest_dag_run_row_query(info, session))
if not dr:
return False
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if non_create_reason:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=non_create_reason,
sort_ordinal=backfill_sort_ordinal,
)
)
return True
return False


def _create_backfill_dag_run(
*,
dag: DAG,
Expand All @@ -263,57 +286,54 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
with session.begin_nested() as nested:
dr = session.scalar(
with_row_locks(
query=_get_latest_dag_run_row_query(info, session),
session=session,
),
with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)
if dr:
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if non_create_reason:
# rolling back here restores to start of this nested tran
# which releases the lock on the latest dag run, since we
# are not creating a new one
nested.rollback()
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=non_create_reason,
sort_ordinal=backfill_sort_ordinal,
)
)
return
if should_skip_create_backfill:
return

dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
try:
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
session=session,
)
session.add(
BackfillDagRun(
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
dag_run_id=dr.id,
sort_ordinal=backfill_sort_ordinal,
logical_date=info.logical_date,
session=session,
)
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
sort_ordinal=backfill_sort_ordinal,
logical_date=info.logical_date,
)
)
except IntegrityError:
log.info(
"Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)",
dr.dag_id,
dr.id,
info.logical_date,
)
log.info("Doing session rollback.")
session.rollback()

should_create_backfill_dag_run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why we are calling should_create_backfill_dag_run here?

this seems might be a mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should_create_backfill_dag_run is called to create backfill dag run record if the insert fails.
Created a separate function for this to reuse in try and error handling

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if insert fails, wouldn’t a new insert still fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try block creates a backfill by inserting into dag_run table and if that fails, the error handling here is inserting a row in backfill_dag_run table with non_create_reason.
This was suggested here - #46248 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any concerns I can address them in another PR along with adding a test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the logic is (simplified)

  1. Check if there’s an existing run
  2. If so, return
  3. Otherwise, try to create a new run
  4. If that succeeds, good.
  5. Otherwise, race condition? Go to should_create_backfill_dag_run again; in this case, _get_latest_dag_run_row_query should return that conflicting run instead, and we only insert a new BackfillDagRun without a new DagRun.

It might be better if it’s clearer that the DagRun fetched in the second should_create_backfill_dag_run call should be a different one, but I’m not exactly sure how. Maybe call _get_latest_dag_run_row_query in _create_backfill_dag_run instead? Not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the problem here is in part naming.

a function called "should_create_x" reads like it will just give you information. so you would not expect it to be creating records, or to depend on it to create a record.

otherwise it's confusing. you could either get rid of this method, or limit it to only give info (e.g. maybe just return BackfillDagRunExceptionReason | None and do the insert conditionally at call site)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved in #46959

info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
)
)


def _get_info_list(
Expand Down Expand Up @@ -389,16 +409,8 @@ def _create_backfill(
dag=dag,
)

log.info("obtaining lock on dag %s", dag_id)
# we obtain a lock on dag model so that nothing else will create
# dag runs at the same time. mainly this is required by non-uniqueness
# of logical_date. otherwise we could just create run in a try-except.
dag_model = session.scalar(
with_row_locks(
select(DagModel).where(DagModel.dag_id == dag_id),
session=session,
)
)
dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id))

if not dag_model:
raise RuntimeError(f"Dag {dag_id} not found")

Expand Down