From 70a0bb7fd5605c526097f1bb16b209707de64e08 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Fri, 28 Feb 2020 11:44:46 +0100 Subject: [PATCH] [AIRFLOW-6954] Use DagRunType instead of ID_PREFIX in run_id fixup! [AIRFLOW-6954] Use DagRunType instead of ID_PREFIX in run_id --- airflow/api/common/experimental/mark_tasks.py | 13 +++++++------ airflow/api/common/experimental/trigger_dag.py | 2 +- airflow/jobs/backfill_job.py | 4 +--- airflow/jobs/base_job.py | 2 +- airflow/jobs/scheduler_job.py | 6 +++--- airflow/models/dagrun.py | 14 +++----------- airflow/ti_deps/deps/dagrun_id_dep.py | 2 +- airflow/utils/types.py | 6 +++--- airflow/www/views.py | 2 +- tests/api/common/experimental/test_mark_tasks.py | 7 ++++--- tests/jobs/test_backfill_job.py | 3 +-- tests/jobs/test_scheduler_job.py | 10 +++++----- tests/models/test_dag.py | 4 +++- tests/models/test_dagrun.py | 11 ++--------- tests/test_core.py | 4 +++- tests/ti_deps/deps/test_dagrun_id_dep.py | 2 +- tests/www/test_views.py | 7 ++++--- 17 files changed, 44 insertions(+), 55 deletions(-) diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 9eb17dc871f3b..e356833fd3461 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -22,23 +22,24 @@ from sqlalchemy import or_ -from airflow.jobs.backfill_job import BackfillJob -from airflow.models import DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance from airflow.operators.subdag_operator import SubDagOperator from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.state import State +from airflow.utils.types import DagRunType -def _create_dagruns(dag, execution_dates, state, run_id_template): +def _create_dagruns(dag, execution_dates, state, run_type): """ Infers from the dates which dag runs need to be created and does so. :param dag: the dag to create dag runs for :param execution_dates: list of execution dates to evaluate :param state: the state to set the dag run to - :param run_id_template:the template for run id to be with the execution date + :param run_type: The prefix will be used to construct dag run id: {run_id_prefix}__{execution_date} :return: newly created and existing dag runs for the execution dates supplied """ # find out if we need to create any dag runs @@ -47,7 +48,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template): for date in dates_to_create: dag_run = dag.create_dagrun( - run_id=run_id_template.format(date.isoformat()), + run_id=f"{run_type}__{date.isoformat()}", execution_date=date, start_date=timezone.utcnow(), external_trigger=False, @@ -187,7 +188,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates): dag_runs = _create_dagruns(current_task.subdag, execution_dates=confirmed_dates, state=State.RUNNING, - run_id_template=BackfillJob.ID_FORMAT_PREFIX) + run_type=DagRunType.BACKFILL_JOB.value) verify_dagruns(dag_runs, commit, state, session, current_task) diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index f71bf5a684660..6165e76b15a9d 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -69,7 +69,7 @@ def _trigger_dag( min_dag_start_date.isoformat())) if not run_id: - run_id = "{}{}".format(DagRunType.MANUAL.value, execution_date.isoformat()) + run_id = f"{DagRunType.MANUAL.value}__{execution_date.isoformat()}" dag_run_id = dag_run.find(dag_id=dag_id, run_id=run_id) if dag_run_id: diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 542a3cb85873b..50ea515962d02 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -51,8 +51,6 @@ class BackfillJob(BaseJob): triggers a set of task instance runs, in the right order and lasts for as long as it takes for the set of task instance to be completed. """ - ID_PREFIX = DagRunType.BACKFILL_JOB.value - ID_FORMAT_PREFIX = ID_PREFIX + '{0}' STATES_COUNT_AS_RUNNING = (State.RUNNING, State.QUEUED) __mapper_args__ = { @@ -290,7 +288,7 @@ def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None): :param session: the database session object :return: a DagRun in state RUNNING or None """ - run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat()) + run_id = f"{DagRunType.BACKFILL_JOB.value}__{run_date.isoformat()}" # consider max_active_runs but ignore when running subdags respect_dag_max_active_limit = bool(dag.schedule_interval and not dag.is_subdag) diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index cf85256d9040e..d17427592d5eb 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -272,7 +272,7 @@ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None): TI.execution_date == DR.execution_date)) .filter( DR.state == State.RUNNING, - DR.run_id.notlike(DagRunType.BACKFILL_JOB.value + '%'), + DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"), TI.state.in_(resettable_states))).all() else: resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states, diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 53e80f9d31e9b..227be9ebaff89 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -558,7 +558,7 @@ def create_dag_run(self, dag, dag_runs=None, session=None): .filter(or_( DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison # add % as a wildcard for the like query - DagRun.run_id.like(DagRunType.SCHEDULED.value + '%') + DagRun.run_id.like(f"{DagRunType.SCHEDULED.value}__%") ) ) ) @@ -644,7 +644,7 @@ def create_dag_run(self, dag, dag_runs=None, session=None): if next_run_date and period_end and period_end <= timezone.utcnow(): next_run = dag.create_dagrun( - run_id=DagRunType.SCHEDULED.value + next_run_date.isoformat(), + run_id=f"{DagRunType.SCHEDULED.value}__{next_run_date.isoformat()}", execution_date=next_run_date, start_date=timezone.utcnow(), state=State.RUNNING, @@ -1135,7 +1135,7 @@ def _find_executable_task_instances(self, simple_dag_bag, session=None): .outerjoin( DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) - .filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(DagRunType.BACKFILL_JOB.value + '%')))) + .filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(f"{DagRunType.BACKFILL_JOB.value}__%")))) .outerjoin(DM, DM.dag_id == TI.dag_id) .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) .filter(TI.state == State.SCHEDULED) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index a656b6f17aa89..df213235e6019 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -46,9 +46,6 @@ class DagRun(Base, LoggingMixin): """ __tablename__ = "dag_run" - ID_PREFIX = DagRunType.SCHEDULED.value - ID_FORMAT_PREFIX = ID_PREFIX + '{0}' - id = Column(Integer, primary_key=True) dag_id = Column(String(ID_LEN)) execution_date = Column(UtcDateTime, default=timezone.utcnow) @@ -98,12 +95,7 @@ def set_state(self, state): @declared_attr def state(self): - return synonym('_state', - descriptor=property(self.get_state, self.set_state)) - - @classmethod - def id_for_date(cls, date, prefix=ID_FORMAT_PREFIX): - return prefix.format(date.isoformat()[:19]) + return synonym('_state', descriptor=property(self.get_state, self.set_state)) @provide_session def refresh_from_db(self, session=None): @@ -185,7 +177,7 @@ def find( qry = qry.filter(DR.external_trigger == external_trigger) if no_backfills: # in order to prevent a circular dependency - qry = qry.filter(DR.run_id.notlike(DagRunType.BACKFILL_JOB.value + '%')) + qry = qry.filter(DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%")) dr = qry.order_by(DR.execution_date).all() @@ -495,7 +487,7 @@ def get_run(session, dag_id, execution_date): def is_backfill(self): return ( self.run_id is not None and - self.run_id.startswith(DagRunType.BACKFILL_JOB.value) + self.run_id.startswith(f"{DagRunType.BACKFILL_JOB.value}") ) @classmethod diff --git a/airflow/ti_deps/deps/dagrun_id_dep.py b/airflow/ti_deps/deps/dagrun_id_dep.py index 2e995ddda5b0c..697e6fe2462d9 100644 --- a/airflow/ti_deps/deps/dagrun_id_dep.py +++ b/airflow/ti_deps/deps/dagrun_id_dep.py @@ -47,7 +47,7 @@ def _get_dep_statuses(self, ti, session, dep_context=None): """ dagrun = ti.get_dagrun(session) - if not dagrun.run_id or not match(DagRunType.BACKFILL_JOB.value + '.*', dagrun.run_id): + if not dagrun.run_id or not match(f"{DagRunType.BACKFILL_JOB.value}.*", dagrun.run_id): yield self._passing_status( reason=f"Task's DagRun run_id is either NULL " f"or doesn't start with {DagRunType.BACKFILL_JOB.value}") diff --git a/airflow/utils/types.py b/airflow/utils/types.py index 9dd4c1e697887..eaba788524918 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -19,6 +19,6 @@ class DagRunType(enum.Enum): """Class with DagRun types""" - BACKFILL_JOB = "backfill_" - SCHEDULED = "scheduled__" - MANUAL = "manual__" + BACKFILL_JOB = "backfill" + SCHEDULED = "scheduled" + MANUAL = "manual" diff --git a/airflow/www/views.py b/airflow/www/views.py index a1b3a6c455829..274a1de8f29e4 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -997,7 +997,7 @@ def trigger(self, session=None): return redirect(origin) execution_date = timezone.utcnow() - run_id = "{}{}".format(DagRunType.MANUAL.value, execution_date.isoformat()) + run_id = f"{DagRunType.MANUAL.value}__{execution_date.isoformat()}" dr = DagRun.find(dag_id=dag_id, run_id=run_id) if dr: diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py index ca70563b8b8a6..50e4c674b8dec 100644 --- a/tests/api/common/experimental/test_mark_tasks.py +++ b/tests/api/common/experimental/test_mark_tasks.py @@ -32,6 +32,7 @@ from airflow.utils.dates import days_ago from airflow.utils.session import create_session, provide_session from airflow.utils.state import State +from airflow.utils.types import DagRunType from tests.test_utils.db import clear_db_runs DEV_NULL = "/dev/null" @@ -57,7 +58,7 @@ def setUp(self): clear_db_runs() drs = _create_dagruns(self.dag1, self.execution_dates, state=State.RUNNING, - run_id_template="scheduled__{}") + run_type=DagRunType.SCHEDULED.value) for dr in drs: dr.dag = self.dag1 dr.verify_integrity() @@ -65,7 +66,7 @@ def setUp(self): drs = _create_dagruns(self.dag2, [self.dag2.default_args['start_date']], state=State.RUNNING, - run_id_template="scheduled__{}") + run_type=DagRunType.SCHEDULED.value) for dr in drs: dr.dag = self.dag2 @@ -74,7 +75,7 @@ def setUp(self): drs = _create_dagruns(self.dag3, self.dag3_execution_dates, state=State.SUCCESS, - run_id_template="manual__{}") + run_type=DagRunType.MANUAL.value) for dr in drs: dr.dag = self.dag3 dr.verify_integrity() diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 10fffefe44414..8c9888d815d8a 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1061,8 +1061,7 @@ def test_sub_set_subdag(self): drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) dr = drs[0] - self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()), - dr.run_id) + self.assertEqual(f"{DagRunType.BACKFILL_JOB.value}__{DEFAULT_DATE.isoformat()}", dr.run_id) for ti in dr.get_task_instances(): if ti.task_id == 'leave1' or ti.task_id == 'leave2': self.assertEqual(State.SUCCESS, ti.state) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 6a39ce10c2c37..ee53f401b728b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1385,7 +1385,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self): session = settings.Session() dr1 = dag_file_processor.create_dag_run(dag) - dr1.run_id = DagRunType.BACKFILL_JOB.value + '_blah' + dr1.run_id = f"{DagRunType.BACKFILL_JOB.value}__blah" ti1 = TaskInstance(task1, dr1.execution_date) ti1.refresh_from_db() ti1.state = State.SCHEDULED @@ -1412,7 +1412,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self): dr1 = dag_file_processor.create_dag_run(dag) dr2 = dag_file_processor.create_dag_run(dag) - dr2.run_id = DagRunType.BACKFILL_JOB.value + 'asdf' + dr2.run_id = f"{DagRunType.BACKFILL_JOB.value}__asdf" ti_no_dagrun = TaskInstance(task1, DEFAULT_DATE - datetime.timedelta(days=1)) ti_backfill = TaskInstance(task1, dr2.execution_date) @@ -2070,12 +2070,12 @@ def test_execute_helper_reset_orphaned_tasks(self): op1 = DummyOperator(task_id='op1') dag.clear() - dr = dag.create_dagrun(run_id=DagRunType.SCHEDULED.value, + dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__", state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE, session=session) - dr2 = dag.create_dagrun(run_id=DagRunType.BACKFILL_JOB.value, + dr2 = dag.create_dagrun(run_id=f"{DagRunType.BACKFILL_JOB.value}__", state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(1), start_date=DEFAULT_DATE, @@ -2922,7 +2922,7 @@ def test_reset_orphaned_tasks_backfill_dag(self): ti = dr1.get_task_instances(session=session)[0] ti.state = State.SCHEDULED dr1.state = State.RUNNING - dr1.run_id = DagRunType.BACKFILL_JOB.value + '_sdfsfdfsd' + dr1.run_id = f"{DagRunType.BACKFILL_JOB.value}__sdfsfdfsd" session.merge(ti) session.merge(dr1) session.commit() diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 77c2f786a7318..eae06fe91e9a2 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -47,6 +47,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime as datetime_tz +from airflow.utils.types import DagRunType from airflow.utils.weight_rule import WeightRule from tests.models import DEFAULT_DATE from tests.test_utils.asserts import assert_queries_count @@ -1219,7 +1220,8 @@ def test_schedule_dag_fake_scheduled_previous(self): start_date=DEFAULT_DATE)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) - dag.create_dagrun(run_id=DagRun.id_for_date(DEFAULT_DATE), + run_id = f"{DagRunType.SCHEDULED.value}__{DEFAULT_DATE.isoformat()}" + dag.create_dagrun(run_id=run_id, execution_date=DEFAULT_DATE, state=State.SUCCESS, external_trigger=True) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 6aa7482c18365..4e64ac91a7d6f 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -43,7 +43,7 @@ def create_dag_run(self, dag, if execution_date is None: execution_date = now if is_backfill: - run_id = DagRunType.BACKFILL_JOB.value + now.isoformat() + run_id = f"{DagRunType.BACKFILL_JOB.value}__{now.isoformat()}" else: run_id = 'manual__' + now.isoformat() dag_run = dag.create_dagrun( @@ -85,13 +85,6 @@ def test_clear_task_instances_for_backfill_dagrun(self): ).first() self.assertEqual(dr0.state, State.RUNNING) - def test_id_for_date(self): - run_id = models.DagRun.id_for_date( - timezone.datetime(2015, 1, 2, 3, 4, 5, 6)) - self.assertEqual( - 'scheduled__2015-01-02T03:04:05', run_id, - 'Generated run_id did not match expectations: {0}'.format(run_id)) - def test_dagrun_find(self): session = settings.Session() now = timezone.utcnow() @@ -523,7 +516,7 @@ def test_is_backfill(self): dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE) dagrun = self.create_dag_run(dag, execution_date=DEFAULT_DATE) - dagrun.run_id = DagRunType.BACKFILL_JOB.value + '_sfddsffds' + dagrun.run_id = f"{DagRunType.BACKFILL_JOB.value}__sfddsffds" dagrun2 = self.create_dag_run( dag, execution_date=DEFAULT_DATE + datetime.timedelta(days=1)) diff --git a/tests/test_core.py b/tests/test_core.py index 9ea0bf90f91d4..eacccb9655073 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -41,6 +41,7 @@ from airflow.utils.dates import infer_time_unit, round_time, scale_time_units from airflow.utils.state import State from airflow.utils.timezone import datetime +from airflow.utils.types import DagRunType from tests.test_utils.config import conf_vars DEV_NULL = '/dev/null' @@ -469,7 +470,8 @@ def test_externally_triggered_dagrun(self): start_date=DEFAULT_DATE) task = DummyOperator(task_id='test_externally_triggered_dag_context', dag=dag) - dag.create_dagrun(run_id=DagRun.id_for_date(execution_date), + run_id = f"{DagRunType.SCHEDULED.value}__{execution_date.isoformat()}" + dag.create_dagrun(run_id=run_id, execution_date=execution_date, state=State.RUNNING, external_trigger=True) diff --git a/tests/ti_deps/deps/test_dagrun_id_dep.py b/tests/ti_deps/deps/test_dagrun_id_dep.py index 85dabba97a0a7..5dfb8763d949e 100644 --- a/tests/ti_deps/deps/test_dagrun_id_dep.py +++ b/tests/ti_deps/deps/test_dagrun_id_dep.py @@ -31,7 +31,7 @@ def test_dagrun_id_is_backfill(self): Task instances whose dagrun ID is a backfill dagrun ID should fail this dep. """ dagrun = DagRun() - dagrun.run_id = DagRunType.BACKFILL_JOB.value + '_something' + dagrun.run_id = f"{DagRunType.BACKFILL_JOB.value}__something" ti = Mock(get_dagrun=Mock(return_value=dagrun)) self.assertFalse(DagrunIdDep().is_met(ti=ti)) diff --git a/tests/www/test_views.py b/tests/www/test_views.py index fab1528d73146..299a413525e28 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -53,6 +53,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime +from airflow.utils.types import DagRunType from airflow.www import app as application from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs @@ -322,7 +323,7 @@ def test_index(self): class TestAirflowBaseViews(TestBase): EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2) - run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE)) + run_id = f"test_{DagRunType.SCHEDULED.value}__{EXAMPLE_DAG_DEFAULT_DATE.isoformat()}" @classmethod def setUpClass(cls): @@ -1240,7 +1241,7 @@ class TestDagACLView(TestBase): """ next_year = dt.now().year + 1 default_date = timezone.datetime(next_year, 6, 1) - run_id = "test_{}".format(models.DagRun.id_for_date(default_date)) + run_id = f"test_{DagRunType.SCHEDULED.value}__{default_date.isoformat()}" @classmethod def setUpClass(cls): @@ -2252,7 +2253,7 @@ def test_create_dagrun_invalid_conf(self): class TestDecorators(TestBase): EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2) - run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE)) + run_id = f"test_{DagRunType.SCHEDULED.value}__{EXAMPLE_DAG_DEFAULT_DATE.isoformat()}" @classmethod def setUpClass(cls):