Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@

from sqlalchemy import or_

from airflow.jobs.backfill_job import BackfillJob
from airflow.models import DagRun, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType


def _create_dagruns(dag, execution_dates, state, run_id_template):
def _create_dagruns(dag, execution_dates, state, run_type):
"""
Infers from the dates which dag runs need to be created and does so.

:param dag: the dag to create dag runs for
:param execution_dates: list of execution dates to evaluate
:param state: the state to set the dag run to
:param run_id_template:the template for run id to be with the execution date
:param run_type: The prefix will be used to construct dag run id: {run_id_prefix}__{execution_date}
:return: newly created and existing dag runs for the execution dates supplied
"""
# find out if we need to create any dag runs
Expand All @@ -47,7 +48,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):

for date in dates_to_create:
dag_run = dag.create_dagrun(
run_id=run_id_template.format(date.isoformat()),
run_id=f"{run_type}__{date.isoformat()}",
execution_date=date,
start_date=timezone.utcnow(),
external_trigger=False,
Expand Down Expand Up @@ -187,7 +188,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
dag_runs = _create_dagruns(current_task.subdag,
execution_dates=confirmed_dates,
state=State.RUNNING,
run_id_template=BackfillJob.ID_FORMAT_PREFIX)
run_type=DagRunType.BACKFILL_JOB.value)

verify_dagruns(dag_runs, commit, state, session, current_task)

Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _trigger_dag(
min_dag_start_date.isoformat()))

if not run_id:
run_id = "{}{}".format(DagRunType.MANUAL.value, execution_date.isoformat())
run_id = f"{DagRunType.MANUAL.value}__{execution_date.isoformat()}"

dag_run_id = dag_run.find(dag_id=dag_id, run_id=run_id)
if dag_run_id:
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ class BackfillJob(BaseJob):
triggers a set of task instance runs, in the right order and lasts for
as long as it takes for the set of task instance to be completed.
"""
ID_PREFIX = DagRunType.BACKFILL_JOB.value
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
STATES_COUNT_AS_RUNNING = (State.RUNNING, State.QUEUED)

__mapper_args__ = {
Expand Down Expand Up @@ -290,7 +288,7 @@ def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):
:param session: the database session object
:return: a DagRun in state RUNNING or None
"""
run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
run_id = f"{DagRunType.BACKFILL_JOB.value}__{run_date.isoformat()}"

# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = bool(dag.schedule_interval and not dag.is_subdag)
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
TI.execution_date == DR.execution_date))
.filter(
DR.state == State.RUNNING,
DR.run_id.notlike(DagRunType.BACKFILL_JOB.value + '%'),
DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"),
TI.state.in_(resettable_states))).all()
else:
resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states,
Expand Down
6 changes: 3 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ def create_dag_run(self, dag, dag_runs=None, session=None):
.filter(or_(
DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison
# add % as a wildcard for the like query
DagRun.run_id.like(DagRunType.SCHEDULED.value + '%')
DagRun.run_id.like(f"{DagRunType.SCHEDULED.value}__%")
)
)
)
Expand Down Expand Up @@ -644,7 +644,7 @@ def create_dag_run(self, dag, dag_runs=None, session=None):

if next_run_date and period_end and period_end <= timezone.utcnow():
next_run = dag.create_dagrun(
run_id=DagRunType.SCHEDULED.value + next_run_date.isoformat(),
run_id=f"{DagRunType.SCHEDULED.value}__{next_run_date.isoformat()}",
execution_date=next_run_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
Expand Down Expand Up @@ -1135,7 +1135,7 @@ def _find_executable_task_instances(self, simple_dag_bag, session=None):
.outerjoin(
DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
)
.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(DagRunType.BACKFILL_JOB.value + '%'))))
.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(f"{DagRunType.BACKFILL_JOB.value}__%"))))
.outerjoin(DM, DM.dag_id == TI.dag_id)
.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
.filter(TI.state == State.SCHEDULED)
Expand Down
14 changes: 3 additions & 11 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class DagRun(Base, LoggingMixin):
"""
__tablename__ = "dag_run"

ID_PREFIX = DagRunType.SCHEDULED.value
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(UtcDateTime, default=timezone.utcnow)
Expand Down Expand Up @@ -98,12 +95,7 @@ def set_state(self, state):

@declared_attr
def state(self):
return synonym('_state',
descriptor=property(self.get_state, self.set_state))

@classmethod
def id_for_date(cls, date, prefix=ID_FORMAT_PREFIX):
return prefix.format(date.isoformat()[:19])
return synonym('_state', descriptor=property(self.get_state, self.set_state))

@provide_session
def refresh_from_db(self, session=None):
Expand Down Expand Up @@ -185,7 +177,7 @@ def find(
qry = qry.filter(DR.external_trigger == external_trigger)
if no_backfills:
# in order to prevent a circular dependency
qry = qry.filter(DR.run_id.notlike(DagRunType.BACKFILL_JOB.value + '%'))
qry = qry.filter(DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"))

dr = qry.order_by(DR.execution_date).all()

Expand Down Expand Up @@ -495,7 +487,7 @@ def get_run(session, dag_id, execution_date):
def is_backfill(self):
return (
self.run_id is not None and
self.run_id.startswith(DagRunType.BACKFILL_JOB.value)
self.run_id.startswith(f"{DagRunType.BACKFILL_JOB.value}")
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion airflow/ti_deps/deps/dagrun_id_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _get_dep_statuses(self, ti, session, dep_context=None):
"""
dagrun = ti.get_dagrun(session)

if not dagrun.run_id or not match(DagRunType.BACKFILL_JOB.value + '.*', dagrun.run_id):
if not dagrun.run_id or not match(f"{DagRunType.BACKFILL_JOB.value}.*", dagrun.run_id):
yield self._passing_status(
reason=f"Task's DagRun run_id is either NULL "
f"or doesn't start with {DagRunType.BACKFILL_JOB.value}")
Expand Down
6 changes: 3 additions & 3 deletions airflow/utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

class DagRunType(enum.Enum):
"""Class with DagRun types"""
BACKFILL_JOB = "backfill_"
SCHEDULED = "scheduled__"
MANUAL = "manual__"
BACKFILL_JOB = "backfill"
SCHEDULED = "scheduled"
MANUAL = "manual"
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ def trigger(self, session=None):
return redirect(origin)

execution_date = timezone.utcnow()
run_id = "{}{}".format(DagRunType.MANUAL.value, execution_date.isoformat())
run_id = f"{DagRunType.MANUAL.value}__{execution_date.isoformat()}"

dr = DagRun.find(dag_id=dag_id, run_id=run_id)
if dr:
Expand Down
7 changes: 4 additions & 3 deletions tests/api/common/experimental/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.test_utils.db import clear_db_runs

DEV_NULL = "/dev/null"
Expand All @@ -57,15 +58,15 @@ def setUp(self):
clear_db_runs()
drs = _create_dagruns(self.dag1, self.execution_dates,
state=State.RUNNING,
run_id_template="scheduled__{}")
run_type=DagRunType.SCHEDULED.value)
for dr in drs:
dr.dag = self.dag1
dr.verify_integrity()

drs = _create_dagruns(self.dag2,
[self.dag2.default_args['start_date']],
state=State.RUNNING,
run_id_template="scheduled__{}")
run_type=DagRunType.SCHEDULED.value)

for dr in drs:
dr.dag = self.dag2
Expand All @@ -74,7 +75,7 @@ def setUp(self):
drs = _create_dagruns(self.dag3,
self.dag3_execution_dates,
state=State.SUCCESS,
run_id_template="manual__{}")
run_type=DagRunType.MANUAL.value)
for dr in drs:
dr.dag = self.dag3
dr.verify_integrity()
Expand Down
3 changes: 1 addition & 2 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,8 +1061,7 @@ def test_sub_set_subdag(self):
drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
dr = drs[0]

self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
dr.run_id)
self.assertEqual(f"{DagRunType.BACKFILL_JOB.value}__{DEFAULT_DATE.isoformat()}", dr.run_id)
for ti in dr.get_task_instances():
if ti.task_id == 'leave1' or ti.task_id == 'leave2':
self.assertEqual(State.SUCCESS, ti.state)
Expand Down
10 changes: 5 additions & 5 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self):
session = settings.Session()

dr1 = dag_file_processor.create_dag_run(dag)
dr1.run_id = DagRunType.BACKFILL_JOB.value + '_blah'
dr1.run_id = f"{DagRunType.BACKFILL_JOB.value}__blah"
ti1 = TaskInstance(task1, dr1.execution_date)
ti1.refresh_from_db()
ti1.state = State.SCHEDULED
Expand All @@ -1412,7 +1412,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self):

dr1 = dag_file_processor.create_dag_run(dag)
dr2 = dag_file_processor.create_dag_run(dag)
dr2.run_id = DagRunType.BACKFILL_JOB.value + 'asdf'
dr2.run_id = f"{DagRunType.BACKFILL_JOB.value}__asdf"

ti_no_dagrun = TaskInstance(task1, DEFAULT_DATE - datetime.timedelta(days=1))
ti_backfill = TaskInstance(task1, dr2.execution_date)
Expand Down Expand Up @@ -2070,12 +2070,12 @@ def test_execute_helper_reset_orphaned_tasks(self):
op1 = DummyOperator(task_id='op1')

dag.clear()
dr = dag.create_dagrun(run_id=DagRunType.SCHEDULED.value,
dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
dr2 = dag.create_dagrun(run_id=DagRunType.BACKFILL_JOB.value,
dr2 = dag.create_dagrun(run_id=f"{DagRunType.BACKFILL_JOB.value}__",
state=State.RUNNING,
execution_date=DEFAULT_DATE + datetime.timedelta(1),
start_date=DEFAULT_DATE,
Expand Down Expand Up @@ -2922,7 +2922,7 @@ def test_reset_orphaned_tasks_backfill_dag(self):
ti = dr1.get_task_instances(session=session)[0]
ti.state = State.SCHEDULED
dr1.state = State.RUNNING
dr1.run_id = DagRunType.BACKFILL_JOB.value + '_sdfsfdfsd'
dr1.run_id = f"{DagRunType.BACKFILL_JOB.value}__sdfsfdfsd"
session.merge(ti)
session.merge(dr1)
session.commit()
Expand Down
4 changes: 3 additions & 1 deletion tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime as datetime_tz
from airflow.utils.types import DagRunType
from airflow.utils.weight_rule import WeightRule
from tests.models import DEFAULT_DATE
from tests.test_utils.asserts import assert_queries_count
Expand Down Expand Up @@ -1219,7 +1220,8 @@ def test_schedule_dag_fake_scheduled_previous(self):
start_date=DEFAULT_DATE))

dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.create_dagrun(run_id=DagRun.id_for_date(DEFAULT_DATE),
run_id = f"{DagRunType.SCHEDULED.value}__{DEFAULT_DATE.isoformat()}"
dag.create_dagrun(run_id=run_id,
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
external_trigger=True)
Expand Down
11 changes: 2 additions & 9 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def create_dag_run(self, dag,
if execution_date is None:
execution_date = now
if is_backfill:
run_id = DagRunType.BACKFILL_JOB.value + now.isoformat()
run_id = f"{DagRunType.BACKFILL_JOB.value}__{now.isoformat()}"
else:
run_id = 'manual__' + now.isoformat()
dag_run = dag.create_dagrun(
Expand Down Expand Up @@ -85,13 +85,6 @@ def test_clear_task_instances_for_backfill_dagrun(self):
).first()
self.assertEqual(dr0.state, State.RUNNING)

def test_id_for_date(self):
run_id = models.DagRun.id_for_date(
timezone.datetime(2015, 1, 2, 3, 4, 5, 6))
self.assertEqual(
'scheduled__2015-01-02T03:04:05', run_id,
'Generated run_id did not match expectations: {0}'.format(run_id))

def test_dagrun_find(self):
session = settings.Session()
now = timezone.utcnow()
Expand Down Expand Up @@ -523,7 +516,7 @@ def test_is_backfill(self):
dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE)

dagrun = self.create_dag_run(dag, execution_date=DEFAULT_DATE)
dagrun.run_id = DagRunType.BACKFILL_JOB.value + '_sfddsffds'
dagrun.run_id = f"{DagRunType.BACKFILL_JOB.value}__sfddsffds"

dagrun2 = self.create_dag_run(
dag, execution_date=DEFAULT_DATE + datetime.timedelta(days=1))
Expand Down
4 changes: 3 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars

DEV_NULL = '/dev/null'
Expand Down Expand Up @@ -469,7 +470,8 @@ def test_externally_triggered_dagrun(self):
start_date=DEFAULT_DATE)
task = DummyOperator(task_id='test_externally_triggered_dag_context',
dag=dag)
dag.create_dagrun(run_id=DagRun.id_for_date(execution_date),
run_id = f"{DagRunType.SCHEDULED.value}__{execution_date.isoformat()}"
dag.create_dagrun(run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
external_trigger=True)
Expand Down
2 changes: 1 addition & 1 deletion tests/ti_deps/deps/test_dagrun_id_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_dagrun_id_is_backfill(self):
Task instances whose dagrun ID is a backfill dagrun ID should fail this dep.
"""
dagrun = DagRun()
dagrun.run_id = DagRunType.BACKFILL_JOB.value + '_something'
dagrun.run_id = f"{DagRunType.BACKFILL_JOB.value}__something"
ti = Mock(get_dagrun=Mock(return_value=dagrun))
self.assertFalse(DagrunIdDep().is_met(ti=ti))

Expand Down
7 changes: 4 additions & 3 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from airflow.www import app as application
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
Expand Down Expand Up @@ -322,7 +323,7 @@ def test_index(self):

class TestAirflowBaseViews(TestBase):
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))
run_id = f"test_{DagRunType.SCHEDULED.value}__{EXAMPLE_DAG_DEFAULT_DATE.isoformat()}"

@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -1240,7 +1241,7 @@ class TestDagACLView(TestBase):
"""
next_year = dt.now().year + 1
default_date = timezone.datetime(next_year, 6, 1)
run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
run_id = f"test_{DagRunType.SCHEDULED.value}__{default_date.isoformat()}"

@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -2252,7 +2253,7 @@ def test_create_dagrun_invalid_conf(self):

class TestDecorators(TestBase):
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))
run_id = f"test_{DagRunType.SCHEDULED.value}__{EXAMPLE_DAG_DEFAULT_DATE.isoformat()}"

@classmethod
def setUpClass(cls):
Expand Down